Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7965

Wrong answers for queries with multiple window specs in the same expression

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.4.0
    • 1.4.0
    • SQL

    Description

      I think that Spark SQL may be returning incorrect answers for queries that use multiple window specifications within the same expression. Here's an example that illustrates the problem.

      Say that I have a table with a single numeric column and that I want to compute a cumulative distribution function over this column. Let's call this table nums:

      val nums = sc.parallelize(1 to 10).map(x => (x)).toDF("x")
      nums.registerTempTable("nums")
      

      It's easy to compute a running sum over this column:

      sqlContext.sql("""
          select sum(x) over (rows between unbounded preceding and current row) from nums
      """).collect()
      
      nums: org.apache.spark.sql.DataFrame = [x: int]
      res29: Array[org.apache.spark.sql.Row] = Array([1], [3], [6], [10], [15], [21], [28], [36], [45], [55])
      

      It's also easy to compute a total sum over all rows:

      sqlContext.sql("""
          select sum(x) over (rows between unbounded preceding and unbounded following) from nums
      """).collect()
      
      res34: Array[org.apache.spark.sql.Row] = Array([55], [55], [55], [55], [55], [55], [55], [55], [55], [55])
      

      Let's say that I combine these expressions to compute a CDF:

      sqlContext.sql("""
      	select (sum(x) over (rows between unbounded preceding and current row))
          /
          (sum(x) over (rows between unbounded preceding and unbounded following)) from nums
      """).collect()
      
      res31: Array[org.apache.spark.sql.Row] = Array([1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0], [1.0])
      

      This seems wrong. Note that if we combine the running total, global total, and combined expression in the same query, then we see that the first two values are computed correctly / but the combined expression seems to be incorrect:

      sqlContext.sql("""
          select
          sum(x) over (rows between unbounded preceding and current row) as running_sum,
          (sum(x) over (rows between unbounded preceding and unbounded following)) as total_sum,
          ((sum(x) over (rows between unbounded preceding and current row))
          /
          (sum(x) over (rows between unbounded preceding and unbounded following))) as combined
          from nums 
      """).collect()
      
      res40: Array[org.apache.spark.sql.Row] = Array([1,55,1.0], [3,55,1.0], [6,55,1.0], [10,55,1.0], [15,55,1.0], [21,55,1.0], [28,55,1.0], [36,55,1.0], [45,55,1.0], [55,55,1.0])
      

      /cc Yin Huai

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            yhuai Yin Huai
            joshrosen Josh Rosen
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment