Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29549

Union of DataSourceV2 datasources leads to duplicated results

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4
    • None
    • SQL
    • None

    Description

      Hello!

      I've discovered that when two DataSourceV2 data frames in a query of the exact same shape are joined and there is an aggregation in the query, only the first results are used. The rest get removed by the ReuseExchange rule and reuse the results of the first data frame, leading to N times the first data frame results.

       

      I've put together a repository with an example project where this can be reproduced: https://github.com/erizocosmico/spark-union-issue

       

      Basically, doing this:

       

      val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP BY name")
      val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY name")
      
      products.union(users)
       .select("name")
       .show(truncate = false, numRows = 50)

       

       

      Where products is:

      +---------+---+
      |name |id |
      +---------+---+
      |candy |1 |
      |chocolate|2 |
      |milk |3 |
      |cinnamon |4 |
      |pizza |5 |
      |pineapple|6 |
      +---------+---+

      And users is:

      +-------+---+
      |name |id |
      +-------+---+
      |andy |1 |
      |alice |2 |
      |mike |3 |
      |mariah |4 |
      |eleanor|5 |
      |matthew|6 |
      +-------+---+ 

       

      Results are incorrect:

      +---------+
      |name |
      +---------+
      |candy |
      |pizza |
      |chocolate|
      |cinnamon |
      |pineapple|
      |milk |
      |candy |
      |pizza |
      |chocolate|
      |cinnamon |
      |pineapple|
      |milk |
      +---------+

       

      This is the plan explained:

       

      == Parsed Logical Plan ==
      'Project [unresolvedalias('name, None)]
      +- AnalysisBarrier
       +- Union
       :- Aggregate [name#0], [name#0, count(1) AS count#8L]
       : +- SubqueryAlias products
       : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
       +- Aggregate [name#4], [name#4, count(1) AS count#12L]
       +- SubqueryAlias users
       +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
      == Analyzed Logical Plan ==
      name: string
      Project [name#0]
      +- Union
       :- Aggregate [name#0], [name#0, count(1) AS count#8L]
       : +- SubqueryAlias products
       : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
       +- Aggregate [name#4], [name#4, count(1) AS count#12L]
       +- SubqueryAlias users
       +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
      == Optimized Logical Plan ==
      Union
      :- Aggregate [name#0], [name#0]
      : +- Project [name#0]
      : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
      +- Aggregate [name#4], [name#4]
       +- Project [name#4]
       +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
      == Physical Plan ==
      Union
      :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
      : +- Exchange hashpartitioning(name#0, 200)
      : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
      : +- *(1) Project [name#0]
      : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
      +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
       +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
      

       

       

      In the physical plan, the first exchange is reused, but it shouldn't be because both sources are not the same.

       

      == Physical Plan ==
      Union
      :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
      : +- Exchange hashpartitioning(name#0, 200)
      : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
      : +- *(1) Project [name#0]
      : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
      +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
       +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)

       

      This seems to be fixed in 2.4.x, but affects, 2.3.x versions.

      Attachments

        Activity

          People

            Unassigned Unassigned
            erizocosmico Miguel Molina
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: