Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42168

CoGroup with window function returns incorrect result when partition keys differ in order



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.3, 3.1.3, 3.2.3
    • 3.2.4
    • PySpark, SQL


      The following example returns an incorrect result:

      import pandas as pd
      from pyspark.sql import SparkSession, Window
      from pyspark.sql.functions import col, lit, sum
      spark = SparkSession \
          .builder \
      ids = 1000
      days = 1000
      parts = 10
      id_df = spark.range(ids)
      day_df = spark.range(days).withColumnRenamed("id", "day")
      id_day_df = id_df.join(day_df)
      left_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("left").alias("side")).repartition(parts).cache()
      right_df = id_day_df.select(col("id").alias("id"), col("day").alias("day"), lit("right").alias("side")).repartition(parts).cache()  #.withColumnRenamed("id", "id2")
      # note the column order is different to the groupBy("id", "day") column order below
      window = Window.partitionBy("day", "id")
      left_grouped_df = left_df.groupBy("id", "day")
      right_grouped_df = right_df.withColumn("day_sum", sum(col("day")).over(window)).groupBy("id", "day")
      def cogroup(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
          return pd.DataFrame([{
              "id": left["id"][0] if not left.empty else (right["id"][0] if not right.empty else None),
              "day": left["day"][0] if not left.empty else (right["day"][0] if not right.empty else None),
              "lefts": len(left.index),
              "rights": len(right.index)
      df = left_grouped_df.cogroup(right_grouped_df) \
               .applyInPandas(cogroup, schema="id long, day long, lefts integer, rights integer")

      Output is

      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L), [id#64L, day#65L, lefts#66, rights#67]
         :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
         :     +- ...
         +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
            +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
               +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
                  +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                        +- ...
      | id|day|lefts|rights|
      |  0|  3|    0|     1|
      |  0|  4|    0|     1|
      |  0| 13|    1|     0|
      |  0| 27|    0|     1|
      |  0| 31|    0|     1|
      only showing top 5 rows

      The first child is hash-partitioned by id and day, while the second child is hash-partitioned by day and id (required by the window function). Therefore, rows end up in different partitions.

      This has been fixed in Spark 3.3 by #32875:

      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- FlatMapCoGroupsInPandas [id#8L, day#9L], [id#29L, day#30L], cogroup(id#8L, day#9L, side#10, id#29L, day#30L, side#31, day_sum#54L)#63, [id#64L, day#65L, lefts#66, rights#67]
         :- Sort [id#8L ASC NULLS FIRST, day#9L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(id#8L, day#9L, 200), ENSURE_REQUIREMENTS, [plan_id=117]
         :     +- ...
         +- Sort [id#29L ASC NULLS FIRST, day#30L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#29L, day#30L, 200), ENSURE_REQUIREMENTS, [plan_id=118]
               +- Project [id#29L, day#30L, id#29L, day#30L, side#31, day_sum#54L]
                  +- Window [sum(day#30L) windowspecdefinition(day#30L, id#29L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS day_sum#54L], [day#30L, id#29L]
                     +- Sort [day#30L ASC NULLS FIRST, id#29L ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(day#30L, id#29L, 200), ENSURE_REQUIREMENTS, [plan_id=112]
                           +- ...
      | id|day|lefts|rights|
      |  0| 13|    1|     1|
      |  0| 63|    1|     1|
      |  0| 89|    1|     1|
      |  0| 95|    1|     1|
      |  0| 96|    1|     1|
      only showing top 5 rows

      Only PySpark is to be affected (FlatMapCoGroupsInPandas }}), as Scala API uses {{CoGroup. FlatMapCoGroupsInPandas reports required child distribution ClusteredDistribution, while CoGroup reports HashClusteredDistribution. The EnsureRequirements rule correctly recognizes a HashClusteredDistribution(id, day) as not compatible with hashpartitioning(day, id), while ClusteredDistribution(id, day) is compatible with hashpartitioning(day, id).




            enricomi Enrico Minack
            enricomi Enrico Minack
            0 Vote for this issue
            4 Start watching this issue

