Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6016

Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 1.3.0
    • SQL
    • None

    Description

      saveAsTable is fine and seems we have successfully deleted the old data and written the new data. However, when reading the newly created table, an error will be thrown.

      Error in SQL statement: java.lang.RuntimeException: java.lang.RuntimeException: could not merge metadata: key org.apache.spark.sql.parquet.row.metadata has conflicting values: 
      at parquet.hadoop.api.InitContext.getMergedKeyValueMetaData(InitContext.java:67)
      	at parquet.hadoop.api.ReadSupport.init(ReadSupport.java:84)
      	at org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.getSplits(ParquetTableOperations.scala:469)
      	at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:245)
      	at org.apache.spark.sql.parquet.ParquetRelation2$$anon$1.getPartitions(newParquet.scala:461)
      	...
      

      If I set spark.sql.parquet.cacheMetadata to false, it's fine to query the data.

      Note: the newly created table needs to have more than one file to trigger the bug (if there is only a single file, we will not need to merge metadata).

      To reproduce it, try...

      import org.apache.spark.sql.SaveMode
      import sqlContext._
      sql("drop table if exists test")
      
      val df1 = sqlContext.jsonRDD(sc.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2)) // we will save to 2 parquet files.
      df1.saveAsTable("test", "parquet", SaveMode.Overwrite)
      sql("select * from test").collect.foreach(println) // Warm the FilteringParquetRowInputFormat.footerCache
      
      val df2 = sqlContext.jsonRDD(sc.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4)) // we will save to 4 parquet files.
      df2.saveAsTable("test", "parquet", SaveMode.Overwrite)
      sql("select * from test").collect.foreach(println)
      

      For this example, we have two outdated footers for df1 in footerCache and since we have four parquet files for the new test table, we picked up 2 new footers for df2. Then, we hit the bug.

      Attachments

        Activity

          People

            yhuai Yin Huai
            yhuai Yin Huai
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: