Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44378

Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.2
    • None
    • Spark Submit

    Description

      We have a few spark scala jobs that are currently running in production. Most jobs typically use Dataset, Dataframes. There is a small code in our custom library code, that makes rdd calls example to check if the dataframe is empty: df.rdd.getNumPartitions == 0

      When I enable aqe for these jobs, this .rdd is converted into a separate job of it's own and the entire dag is executed 2x, taking 2x more time. This does not happen when AQE is disabled. Why does this happen and what is the best way to fix the issue?

       

      Sample code to reproduce the issue:

       

       

      import org.apache.spark.sql._ 
        case class Record(
          id: Int,
          name: String
       )
       
          val partCount = 4
          val input1 = (0 until 100).map(part => Record(part, "a"))
       
          val input2 = (100 until 110).map(part => Record(part, "c"))
       
          implicit val enc: Encoder[Record] = Encoders.product[Record]
       
          val ds1 = spark.createDataset(
            spark.sparkContext
              .parallelize(input1, partCount)
          )
       
          va
      
      
      l ds2 = spark.createDataset(
            spark.sparkContext
              .parallelize(input2, partCount)
          )
       
          val ds3 = ds1.join(ds2, Seq("id"))
          val l = ds3.count()
       
          val incomingPartitions = ds3.rdd.getNumPartitions
          log.info(s"Num partitions ${incomingPartitions}")
        

       

      Spark UI for the same job with AQE,

       

      Spark UI for the same job without AQE:

       

       

      This is causing unexpected regression in our jobs when we try to enable AQE for our jobs in production. We use spark 3.1 in production, but I can see the same behavior in spark 3.2 from the console as well

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned Assign to me
            priyankar Priyanka Raju

            Dates

              Created:
              Updated:

              Slack

                Issue deployment