Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37375 Umbrella: Storage Partitioned Join (SPJ)
  3. SPARK-44641

SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.4.0, 3.4.1
    • 3.4.2, 3.5.0
    • SQL

    Description

      Adding the following test case in KeyGroupedPartitionSuite demonstrates the problem.

       

      test("test join key is the second partition key and a transform") {
        val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
        createTable(items, items_schema, items_partitions)
        sql(s"INSERT INTO testcat.ns.$items VALUES " +
          s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
          s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
          s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
          s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
          s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
      
        val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
        createTable(purchases, purchases_schema, purchases_partitions)
        sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
          s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
          s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
          s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
          s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
          s"(3, 19.5, cast('2020-02-01' as timestamp))")
      
        withSQLConf(
          SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
          SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
          SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
            "true") {
          val df = sql("SELECT id, name, i.price as purchase_price, " +
            "p.item_id, p.price as sale_price " +
            s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
            "ON i.arrive_time = p.time " +
            "ORDER BY id, purchase_price, p.item_id, sale_price")
      
          val shuffles = collectShuffles(df.queryExecution.executedPlan)
          assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys are partition keys")
          checkAnswer(df,
            Seq(
              Row(1, "aa", 40.0, 1, 42.0),
              Row(1, "aa", 40.0, 2, 11.0),
              Row(1, "aa", 41.0, 1, 44.0),
              Row(1, "aa", 41.0, 1, 45.0),
              Row(2, "bb", 10.0, 1, 42.0),
              Row(2, "bb", 10.0, 2, 11.0),
              Row(2, "bb", 10.5, 1, 42.0),
              Row(2, "bb", 10.5, 2, 11.0),
              Row(3, "cc", 15.5, 3, 19.5)
            )
          )
        }
      }

       

      Note: this tests has setup the datasourceV2 to return multiple splits for same partition.

      In this case, SPJ is not triggered (because join key does not match partition key), but the following code in DSV2Scan:

      https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194

      intended to fill the empty partition for 'pushdown-vallue' will still iterate through non-grouped partition and lookup from grouped partition to fill the map, resulting in some duplicate input data fed into the join.

      Attachments

        Activity

          People

            csun Chao Sun
            szehon Szehon Ho
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: