Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27784

Alias ID reuse can break correctness when substituting foldable expressions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Cannot Reproduce
    • 2.1.1, 2.3.2
    • None
    • SQL

    Description

      This is a correctness bug when reusing a set of project expressions in the DataFrame API.

      Use case: a user was migrating a table to a new version with an additional column ("data" in the repro case). To migrate the user unions the old table ("t2") with the new table ("t1"), and applies a common set of projections to ensure the union doesn't hit an issue with ordering (SPARK-22335). In some cases, this produces an incorrect query plan:

      Seq((4, "a"), (5, "b"), (6, "c")).toDF("id", "data").write.saveAsTable("t1")
      Seq(1, 2, 3).toDF("id").write.saveAsTable("t2")
      
      val dim = Seq(2, 3, 4).toDF("id")
      val outputCols = Seq($"id", coalesce($"data", lit("_")).as("data"))
      
      val t1 = spark.table("t1").select(outputCols:_*)
      val t2 = spark.table("t2").withColumn("data", lit(null)).select(outputCols:_*)
      
      t1.join(dim, t1("id") === dim("id")).select(t1("id"), t1("data")).union(t2).explain(true)
      == Physical Plan ==
      Union
      :- *Project [id#330, _ AS data#237] <------------------------ THE CONSTANT IS FROM THE OTHER SIDE OF THE UNION
      : +- *BroadcastHashJoin [id#330], [id#234], Inner, BuildRight
      : :- *Project [id#330]
      : :  +- *Filter isnotnull(id#330)
      : :     +- *FileScan parquet t1[id#330] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://.../t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int>
      : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      :    +- LocalTableScan [id#234]
      +- *Project [id#340, _ AS data#237]
         +- *FileScan parquet t2[id#340] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://.../t2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>

      The problem happens because "outputCols" has an alias. The ID for that alias is created when the projection Seq is created, so it is reused in both sides of the union.

      When FoldablePropagation runs, it identifies that "data" in the t2 side of the union is a foldable expression and replaces all references to it, including the references in the t1 side of the union.

      The join to a dimension table is necessary to reproduce the problem because it requires a Projection on top of the join that uses an AttributeReference for data#237. Otherwise, the projections are collapsed and the projection includes an Alias that does not get rewritten by FoldablePropagation.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rdblue Ryan Blue
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: