Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33828 SQL Adaptive Query Execution QA
  3. SPARK-33933

Broadcast timeout happened unexpectedly in AQE

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.0.1
    • 3.2.0
    • SQL
    • None

    Description

      In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below.

       

      Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
      

       

      This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes).  By disable AQE, the issues disappear.

      The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense.

      After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).

      Code to reproduce:

       

      import java.util.UUID
      import scala.util.Random
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.SparkSession
      
      val spark = SparkSession.builder()
        .master("local[2]")
        .appName("Test Broadcast").getOrCreate()
      import spark.implicits._
      
      spark.conf.set("spark.sql.adaptive.enabled", "true")
      
      val sc = spark.sparkContext
      sc.setLogLevel("INFO")
      val uuid = UUID.randomUUID
      
      val df = sc.parallelize(Range(0, 10000), 10000).flatMap(x => {
        for (i <- Range(0, 10000 + Random.nextInt(10000)))
          yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString)
      }).toDF("index", "part", "pv", "uuid")
        .withColumn("md5", md5($"uuid"))
      
      val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x))
      val dim = dim_data.toDF("name", "index")
      
      val result = df.groupBy("index")
        .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv"))
        .join(dim, Seq("index"))
        .collect()

       

       

      Attachments

        Activity

          People

            zhongyu09 Yu Zhong
            zhongyu09 Yu Zhong
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: