Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44897

Local Property Propagation to Subquery Broadcast Exec

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.5.0
    • SQL
    • None

    Description

      https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe mistakenly reverted to address this issue. The claim was local properties propagation in SubqueryBroadcastExec to the dynamic pruning thread is not necessary because they will be propagated by broadcast threads anyways. However, in a scenario where the dynamic pruning thread is first to initialize the broadcast relation future, the local properties will not be propagated correctly. This is because the local properties being propagated to the broadcast threads would already be incorrect.
      I do not have a good way of reproducing this consistently because generally the SubqueryBroadcastExec is not the first to initialize the broadcast relation future, but by adding a Thread.sleep(1) into the doPrepare method of SubqueryBroadcastExec, the following test always fails.

      withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") {
        withTable("a", "b") {
          val confKey = "spark.sql.y"
          val confValue1 = UUID.randomUUID().toString()
          val confValue2 = UUID.randomUUID().toString()
          Seq((confValue1, "1")).toDF("key", "value")
            .write
            .format("parquet")
            .partitionBy("key")
            .mode("overwrite")
            .saveAsTable("a")
          val df1 = spark.table("a")
      
          def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = {
            val df = spark.range(1).mapPartitions { _ =>
              Iterator(TaskContext.get.getLocalProperty(confKey))
            }.filter($"value".contains(confValue)).as("c")
            df.hint("broadcast")
          }
      
          // set local property and assert
          val df2 = generateBroadcastDataFrame(confKey, confValue1)
          spark.sparkContext.setLocalProperty(confKey, confValue1)
          val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value")
          val checks = checkDF.collect()
          assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))
      
          // change local property and re-assert
          Seq((confValue2, "1")).toDF("key", "value")
            .write
            .format("parquet")
            .partitionBy("key")
            .mode("overwrite")
            .saveAsTable("b")
          val df3 = spark.table("b")
          val df4 = generateBroadcastDataFrame(confKey, confValue2)
          spark.sparkContext.setLocalProperty(confKey, confValue2)
          val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value")
          val checks2 = checks2DF.collect()
          assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
          assert(checks2.nonEmpty)
        }
      } 

      Attachments

        Activity

          People

            mikechen Michael Chen
            mikechen Michael Chen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: