Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18539

Cannot filter by nonexisting column in parquet file

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.1, 2.0.2
    • 2.2.0
    • None
    • None

    Description

        import org.apache.spark.SparkConf
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.sql.types.DataTypes._
        import org.apache.spark.sql.types.{StructField, StructType}
      
        val sc = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
      
        val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}"""))
      
        sc.read
          .schema(StructType(Seq(StructField("a", IntegerType))))
          .json(jsonRDD)
          .write
          .parquet("/tmp/test")
      
        sc.read
          .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType, nullable = true))))
          .load("/tmp/test")
          .createOrReplaceTempView("table")
      
        sc.sql("select b from table where b is not null").show()
      

      returns:

      16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
      java.lang.IllegalArgumentException: Column [b] was not found in schema!
      	at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
      	at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
      	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
      	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
      	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
      	at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
      	at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
      	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
      	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367)
      	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
      	at org.apache.spark.scheduler.Task.run(Task.scala:86)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      expected result:

      +---+
      |  b|
      +---+
      +---+
      

      It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting column (without filter) it also works fine.

      Query plan:

      == Parsed Logical Plan ==
      'Project ['b]
      +- 'Filter isnotnull('b)
         +- 'UnresolvedRelation `table`
      
      == Analyzed Logical Plan ==
      b: int
      Project [b#8]
      +- Filter isnotnull(b#8)
         +- SubqueryAlias table
            +- Relation[a#7,b#8] parquet
      
      == Optimized Logical Plan ==
      Project [b#8]
      +- Filter isnotnull(b#8)
         +- Relation[a#7,b#8] parquet
      
      == Physical Plan ==
      *Project [b#8]
      +- *Filter isnotnull(b#8)
         +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: file:/tmp/test, PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<b:int>
      

      Attachments

        Issue Links

          Activity

            People

              dongjoon Dongjoon Hyun
              v-gerasimov Vitaly Gerasimov
              Votes:
              2 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: