Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27213

Unexpected results when filter is used after distinct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.2, 2.4.0
    • None
    • PySpark

    Description

      The following code gives unexpected output due to the filter not getting pushed down in catalyst optimizer.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.show(5)
      df.filter("y_n='y'").select('x','y','z').distinct().show()
      df.select('x','y','z').distinct().filter("y_n='y'").show()
      
      Output
      x y z y_n
      a 123 12.3 n
      a 123 12.3 y
      a 123 12.4 y

       

      x y z
      a 123 12.3
      a 123 12.4

       

      x y z
      a 123 12.4

      Ideally, the second statement should result in an error since the column used in the filter is not present in the preceding select statement. But the catalyst optimizer is using first() on column y_n and then applying the filter.

      Even if the filter was pushed down, the result would have been accurate.

      df = spark.createDataFrame([['a','123','12.3','n'],['a','123','12.3','y'],['a','123','12.4','y']],['x','y','z','y_n'])
      df.filter("y_n='y'").select('x','y','z').distinct().explain(True)
      df.select('x','y','z').distinct().filter("y_n='y'").explain(True) 
      
      Output

       
      == Parsed Logical Plan ==
      Deduplicate x#74, y#75, z#76
      +- AnalysisBarrier
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Aggregate x#74, y#75, z#76, x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(2) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- *(1) HashAggregate(keys=x#74, y#75, z#76, functions=[], output=x#74, y#75, z#76)
      +- *(1) Project x#74, y#75, z#76
      +- *(1) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
       
      ------------------------------------------------------------------------------------------------------------------- 
       
      == Parsed Logical Plan ==
      'Filter ('y_n = y)
      +- AnalysisBarrier
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Analyzed Logical Plan ==
      x: string, y: string, z: string
      Project x#74, y#75, z#76
      +- Filter (y_n#77 = y)
      +- Deduplicate x#74, y#75, z#76
      +- Project x#74, y#75, z#76, y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Optimized Logical Plan ==
      Project x#74, y#75, z#76
      +- Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- Aggregate x#74, y#75, z#76, x#74, y#75, z#76, first(y_n#77, false) AS y_n#77
      +- LogicalRDD x#74, y#75, z#76, y_n#77, false
       
      == Physical Plan ==
      *(3) Project x#74, y#75, z#76
      +- *(3) Filter (isnotnull(y_n#77) && (y_n#77 = y))
      +- SortAggregate(key=x#74, y#75, z#76, functions=first(y_n#77, false), output=x#74, y#75, z#76, y_n#77)
      +- *(2) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Exchange hashpartitioning(x#74, y#75, z#76, 10)
      +- SortAggregate(key=x#74, y#75, z#76, functions=partial_first(y_n#77, false), output=x#74, y#75, z#76, first#95, valueSet#96)
      +- *(1) Sort x#74 ASC NULLS FIRST, y#75 ASC NULLS FIRST, z#76 ASC NULLS FIRST, false, 0
      +- Scan ExistingRDDx#74,y#75,z#76,y_n#77
        

      The second query. ie "df.select('x','y','z').distinct().filter("y_n='y'").explain(True)" should result in error rather than giving wrong output.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rinazbelhaj Rinaz Belhaj
              Holden Karau Holden Karau
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: