Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47918

BroadcastExchange does not support the execute() code path

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.2.1
    • None
    • SQL
    • None
    • MacOS, Linux.

    Description

      Hello, hope you are well. We have flaky tests that fail randomly with the error in the title. Please see the stack trace and the reproducible example below. The executions plans of the action that threw the exception, that did not throw the exception and with AQE disabled looks the same. I was told on GitHub that SPARK-33823 was fixed in 3.2.0, but SPARK-39551 that was fixed in 3.2.2 looks relevant as well. We use mostly AWS EMR and Databricks, and we have a dependency that is build for 3.2.1.

      java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path.
      at org.apache.spark.sql.errors.QueryExecutionErrors$.executeCodePathUnsupportedError(QueryExecutionErrors.scala:1655)
      at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:203)
      at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
      at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
      at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
      at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:119)
      at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
      at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
      
      val df1 = Seq(
        (1, 100, 1, 1.0, 1.0, "00000000-0000-0000-0000-000000000001", "a", "x", 0),
        (2, 100, 1, 1.0, 1.0, "00000000-0000-0000-0000-000000000002", "b", "x", 0)
      ).toDF()
      val df2 = Seq(
        (1, 3, 3, 1, 1, 1, 0, 1, "2024-01-01", 1.0, 1.0),
        (2, 4, 3, 1, 1, 1, 0, 1, "2024-01-01", 1.0, 1.0)
      ).toDF()
      val df3 = Seq(
        (3, 100, Array[Byte](0, 0, 0, 0), "foo0", "", "foobarfooba", "Foobarfoob", "FO", "foobafooba"),
        (4, 100, Array[Byte](0, 0, 0, 1), "foo0", "", "foobarfooba", "Foobarfoob", "FO", "foobafooba")
      ).toDF()
      
      
      val df4 = df1.filter($"_7" =!= "a").cache()
      val df5 = df2.join(df4.select("_1"), Seq("_1"), "left_semi").cache()
      val df6 = df3.join(df5.select("_2"), df3("_1") === df5("_2"), "left_semi")
      
      
      assert(df4.count() == 1)
      assert(df5.count() == 1)
      assert(df5.first().getAs[Int]("_1") == 2)
      assert(df6.count() == 1) // This line throws the error.
      assert(df6.first().getAs[Int]("_1") == 4)
      == Parsed Logical Plan ==                                                                                                                                                                  
      Join LeftSemi, (_1#23495 = _2#16732)                                                                                                                                                           
      :- LocalRelation [_1#23495, _2#23496, _3#23497, _4#23498, _5#23499, _6#23500, _7#23501, _8#23502, _9#23503]                                                                                    
      +- Project [_2#16732]                                                                                                                                                                          
         +- Project [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]                                                                 
            +- Join LeftSemi, (_1#16731 = _1#48)                                                                                                                                                     
               :- LocalRelation [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]                                                     
               +- Project [_1#48]
                  +- Filter NOT (_7#80 = a)
                     +- LocalRelation [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82]
      
      == Analyzed Logical Plan ==
      _1: int, _2: int, _3: binary, _4: string, _5: string, _6: string, _7: string, _8: string, _9: string
      Join LeftSemi, (_1#23495 = _2#16732)
      :- LocalRelation [_1#23495, _2#23496, _3#23497, _4#23498, _5#23499, _6#23500, _7#23501, _8#23502, _9#23503]
      +- Project [_2#16732]
         +- Project [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]
            +- Join LeftSemi, (_1#16731 = _1#48)
               :- LocalRelation [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]
               +- Project [_1#48]
                  +- Filter NOT (_7#80 = a)
                     +- LocalRelation [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82]
      
      == Optimized Logical Plan ==
      Join LeftSemi, (_1#23495 = _2#16732)
      :- LocalRelation [_1#23495, _2#23496, _3#23497, _4#23498, _5#23499, _6#23500, _7#23501, _8#23502, _9#23503]
      +- Project [_2#16732]
         +- InMemoryRelation [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741], StorageLevel(disk, memory, deserialized, 1 replicas)  
               +- *(1) BroadcastHashJoin [_1#16731], [_1#48], LeftSemi, BuildRight, false
                  :- *(1) LocalTableScan [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#463]
                     +- InMemoryTableScan [_1#48]
                           +- InMemoryRelation [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82], StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- LocalTableScan [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82]
      
      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- BroadcastHashJoin [_1#23495], [_2#16732], LeftSemi, BuildRight, false
         :- LocalTableScan [_1#23495, _2#23496, _3#23497, _4#23498, _5#23499, _6#23500, _7#23501, _8#23502, _9#23503]
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#3111]
            +- InMemoryTableScan [_2#16732]
                  +- InMemoryRelation [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741], StorageLevel(disk, memory, deserialized, 1 re
      plicas)                                          
                        +- *(1) BroadcastHashJoin [_1#16731], [_1#48], LeftSemi, BuildRight, false 
                           :- *(1) LocalTableScan [_1#16731, _2#16732, _3#16733, _4#16734, _5#16735, _6#16736, _7#16737, _8#16738, _9#16739, _10#16740, _11#16741]
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#463]
                              +- InMemoryTableScan [_1#48]
                                    +- InMemoryRelation [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82], StorageLevel(disk, memory, deserialized, 1 replicas)
                                          +- LocalTableScan [_1#48, _2#73, _3#74, _4#75, _5#77, _6#79, _7#80, _8#81, _9#82]
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            leonidvasilev Lionia Vasilev
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: