Details
Description
When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL sets parquet.task.side.metadata to true by default. However, somehow this disables filter pushdown.
To workaround this issue and enable Parquet filter pushdown, users can set spark.sql.parquet.filterPushdown to true and parquet.task.side.metadata to false. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.
The following Spark shell snippet can be useful to reproduce this issue:
import org.apache.spark.sql.SQLContext val sqlContext = new SQLContext(sc) import sqlContext._ case class KeyValue(key: Int, value: String) sc. parallelize(1 to 1024). flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))). saveAsParquetFile("large.parquet") parquetFile("large.parquet").registerTempTable("large") sql("SET spark.sql.parquet.filterPushdown=true") sql("SELECT * FROM large").collect() sql("SELECT * FROM large WHERE key < 200").collect()
Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.
Notice that parquet.task.side.metadata must be set in Hadoop configuration (either via core-site.xml or SparkConf.hadoopConfiguration.set()), setting it in spark-defaults.conf or via SparkConf does NOT work.
Attachments
Issue Links
- links to