Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17673

Reused Exchange Aggregations Produce Incorrect Results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.0.0, 2.0.1
    • 2.0.1, 2.1.0
    • SQL

    Description

      https://datastax-oss.atlassian.net/browse/SPARKC-429

      Was brought to my attention where the following code produces incorrect results

       val data = List(TestData("A", 1, 7))
          val frame = session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))
      
          frame.createCassandraTable(
            keySpaceName,
            table,
            partitionKeyColumns = Some(Seq("id")))
      
          frame
            .write
            .format("org.apache.spark.sql.cassandra")
            .mode(SaveMode.Append)
            .options(Map("table" -> table, "keyspace" -> keySpaceName))
            .save()
      
      val loaded = sparkSession.sqlContext
        .read
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> table, "keyspace" -> ks))
        .load()
        .select("id", "col1", "col2")
      val min1 = loaded.groupBy("id").agg(min("col1").as("min"))
      val min2 = loaded.groupBy("id").agg(min("col2").as("min"))
       min1.union(min2).show()
          /* prints:
            +---+---+
            | id|min|
            +---+---+
            |  A|  1|
            |  A|  1|
            +---+---+
           Should be 
            | A| 1|
            | A| 7|
           */
      

      I looked into the explain pattern and saw

      Union
      :- *HashAggregate(keys=[id#93], functions=[min(col1#94)])
      :  +- Exchange hashpartitioning(id#93, 200)
      :     +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)])
      :        +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94]
      +- *HashAggregate(keys=[id#93], functions=[min(col2#95)])
         +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
      

      Which was different than using a parallelized collection as the DF backing. So I tested the same code with a Parquet backed DF and saw the same results.

          frame.write.parquet("garbagetest")
          val parquet = sparkSession.read.parquet("garbagetest").select("id", "col1", "col2")
          println("PDF")
          parquetmin1.union(parquetmin2).explain()
          parquetmin1.union(parquetmin2).show()
          /* prints:
            +---+---+
            | id|min|
            +---+---+
            |  A|  1|
            |  A|  1|
            +---+---+
      */
      

      Which leads me to believe there is something wrong with the reused exchange.

      Attachments

        Activity

          People

            ekhliang Eric Liang
            rspitzer Russell Spitzer
            Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: