Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26352

Join reordering should not change the order of output attributes

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0, 2.3.0, 2.4.0
    • 2.3.3, 2.4.1, 3.0.0
    • SQL

    Description

      The optimizer rule org.apache.spark.sql.catalyst.optimizer.ReorderJoin performs join reordering on inner joins. This was introduced from SPARK-12032 in 2015-12.

      After it had reordered the joins, though, it didn't check whether or not the column order (in terms of the output attribute list) is still the same as before. Thus, it's possible to have a mismatch between the reordered column order vs the schema that a DataFrame thinks it has.

      This can be demonstrated with the example:

      spark.sql("create table table_a (x int, y int) using parquet")
      spark.sql("create table table_b (i int, j int) using parquet")
      spark.sql("create table table_c (a int, b int) using parquet")
      val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i")
      

      here's what the DataFrame thinks:

      scala> df.printSchema
      root
       |-- x: integer (nullable = true)
       |-- y: integer (nullable = true)
       |-- i: integer (nullable = true)
       |-- j: integer (nullable = true)
       |-- a: integer (nullable = true)
       |-- b: integer (nullable = true)
      

      here's what the optimized plan thinks, after join reordering:

      scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
      |-- x: integer
      |-- y: integer
      |-- a: integer
      |-- b: integer
      |-- i: integer
      |-- j: integer
      

      If we exclude the ReorderJoin rule (using Spark 2.4's optimizer rule exclusion feature), it's back to normal:

      scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")
      
      scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i")
      df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]
      
      scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
      |-- x: integer
      |-- y: integer
      |-- i: integer
      |-- j: integer
      |-- a: integer
      |-- b: integer
      

      Note that this column ordering problem leads to data corruption, and can manifest itself in various symptoms:

      • Silently corrupting data, if the reordered columns happen to either have matching types or have sufficiently-compatible types (e.g. all fixed length primitive types are considered as "sufficiently compatible" in an UnsafeRow), then only the resulting data is going to be wrong but it might not trigger any alarms immediately. Or
      • Weird Java-level exceptions like java.lang.NegativeArraySizeException, or even SIGSEGVs.

      Attachments

        Issue Links

          Activity

            People

              rednaxelafx Kris Mok
              rednaxelafx Kris Mok
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: