Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35511

Spark computes all rows during count() on a parquet file

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 3.0.0
    • None
    • Spark Core
    • None

    Description

      We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code 

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
      
      object Test extends App {
        val sparkConf = new SparkConf()
          .setAppName("test-app")
          .setMaster("local[1]")
      
        val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
        import sparkSession.implicits._
      
        val filePath = "./tempFile.parquet"
        (1 to 1000).toDF("c1")
          .repartition(10)
          .write
          .mode("overwrite")
          .parquet(filePath)
      
        val df = sparkSession.read.parquet(filePath)
      
        var rowsInHeavyComputation = 0
        def heavyComputation(row: Row): Row = {
          rowsInHeavyComputation += 1
          println(s"rowsInHeavyComputation = $rowsInHeavyComputation")
          Thread.sleep(50)
          row
        }
      
        implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)
        val cnt = df
          .map(row => heavyComputation(row)) // map operation cannot change number of rows 
          .count()
        println(s"counting done, cnt=$cnt")
      }
      

      we see 

      rowsInHeavyComputation = 1
      rowsInHeavyComputation = 2
      ...
      rowsInHeavyComputation = 999
      rowsInHeavyComputation = 1000
      counting done, cnt=1000
      

       Expected result - spark does not perform heavyComputation at all.

       

      P.S. In our real application we:

      • transform data from parquet files
      • return some examples (50 rows and spark does heavyComputation only for 50 rows)
      • return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              itsukanov Ivan Tsukanov
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: