Description
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.DataTypes._ import org.apache.spark.sql.types.{StructField, StructType} val sc = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate() val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) sc.read .schema(StructType(Seq(StructField("a", IntegerType)))) .json(jsonRDD) .write .parquet("/tmp/test") sc.read .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType, nullable = true)))) .load("/tmp/test") .createOrReplaceTempView("table") sc.sql("select b from table where b is not null").show()
returns:
16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Column [b] was not found in schema! at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
expected result:
+---+ | b| +---+ +---+
It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting column (without filter) it also works fine.
Query plan:
== Parsed Logical Plan == 'Project ['b] +- 'Filter isnotnull('b) +- 'UnresolvedRelation `table` == Analyzed Logical Plan == b: int Project [b#8] +- Filter isnotnull(b#8) +- SubqueryAlias table +- Relation[a#7,b#8] parquet == Optimized Logical Plan == Project [b#8] +- Filter isnotnull(b#8) +- Relation[a#7,b#8] parquet == Physical Plan == *Project [b#8] +- *Filter isnotnull(b#8) +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: file:/tmp/test, PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<b:int>
Attachments
Issue Links
- relates to
-
PARQUET-389 Filter predicates should work with missing columns
- Resolved
-
SPARK-19409 Upgrade Parquet to 1.8.2
- Resolved
- links to