Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-48023

Stream-stream join with windowed+watermarked dropDuplicates suppresses all rows

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.0
    • None
    • Spark Core
    • None

    Description

      When applying a streaming dropDuplicates-with-watermark to  a self-referential stream-stream left-join with watermark, the dropDuplicates drops all rows.  If the watermark is eliminated from the dropDuplicates, the query behaves as expected.

       

      The code below demonstrates the error:

       

      from pyspark.sql.functions import window, lit, col, current_timestamp # Added on 4/30 in response to difficulty reproducing
      # 
      # 1. Generate the test data
      #
      size = 1000
      step_secs = 300
      event_offset_secs = 240
      # Add some lines to not left join
      skips = set()
      skips.add(3)
      skips.add(18)
      skips.add(800)
      
      def lit_ts(secs):
          return datetime.datetime.fromtimestamp(secs)
      
      data = []
      base_time = time.time()
      for x in range(size):
          ts = base_time + (step_secs * x)
          data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)})
          if x not in skips:
              data.append(
                  {
                      "event_id": f"two_{x}",
                      "join_id": x,
                      "ts": lit_ts(ts - event_offset_secs),
                  }
              )
      
      # Add duplicates to validate the dropDuplicates
      for i in range(len(data)):
         data.append(data[i].copy()) 
      
      #
      # 2. Write the results so we can stream
      #
      path = "/tmp/bugtest"
      df = spark.createDataFrame(data, schema="event_id string, join_id string, ts TIMESTAMP")
      df.repartition(1).write.format("delta").mode("overwrite").save(path)
      
      
      df = spark.read.format("delta").load(path)
      df.createOrReplaceTempView("test_data")
      #
      # 3. Define the test query
      #
      sql = """
      with also_test_data as (
         select * from test_data
         where event_id like 'two%'
      )
      select td.* from test_data
      td left join also_test_data on (
         td.join_id = also_test_data.join_id
         and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES
         and also_test_data.ts <= td.ts
      )
      where td.event_id like 'one%'
      -- rows where left-join does not match
      and also_test_data.event_id is NULL 
      """
      #
      # 4. Run it non-streaming w/non-deDuplicated to validate results
      #
      res = spark.sql(sql)
      print("Static query")
      res.show(truncate=False)
      # Static query
      # +--------+-------+-------------------------+
      # |event_id|join_id|ts                       |
      # +--------+-------+-------------------------+
      # |one_3   |3      |2024-04-27 17:36:54.97927|
      # |one_18  |18     |2024-04-27 18:51:54.97927|
      # |one_800 |800    |2024-04-30 12:01:54.97927|
      # |one_3   |3      |2024-04-27 17:36:54.97927|
      # |one_18  |18     |2024-04-27 18:51:54.97927|
      # |one_800 |800    |2024-04-30 12:01:54.97927|
      # +--------+-------+-------------------------+
      
      
      #
      # 5. Run it as a stream with no-dropDuplicates
      #
      def write_stream(res):
          (   
              res.writeStream.outputMode("append")
              .trigger(availableNow=True)
              .format("console")
              .start()
              .awaitTermination()
          )
      sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 minutes')
      sdf.createOrReplaceTempView("test_data")
      res = spark.sql(sql)
      write_stream(res)
      # Batch: 1
      # -------------------------------------------
      # +--------+-------+--------------------+
      # |event_id|join_id|                  ts|
      # +--------+-------+--------------------+
      # | one_800|    800|2024-04-30 12:01:...|
      # | one_800|    800|2024-04-30 12:01:...|
      # |   one_3|      3|2024-04-27 17:36:...|
      # |   one_3|      3|2024-04-27 17:36:...|
      # |  one_18|     18|2024-04-27 18:51:...|
      # |  one_18|     18|2024-04-27 18:51:...|
      # +--------+-------+--------------------+
      
      #
      # 6. Run it as a stream with dropDuplicates, but no extra watermark
      #
      sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes")
      sdf.createOrReplaceTempView("test_data")
      res = spark.sql(sql)
      res = (
          (res.select(window("ts", "30 minutes"), "*"))
          .dropDuplicates(["event_id"]).drop("window")
      )
      write_stream(res)
      # -------------------------------------------                                 
      # Batch: 1
      # -------------------------------------------
      # +--------+-------+--------------------+
      # |event_id|join_id|                  ts|
      # +--------+-------+--------------------+
      # | one_800|    800|2024-04-30 12:01:...|
      # |  one_18|     18|2024-04-27 18:51:...|
      # |   one_3|      3|2024-04-27 17:36:...|
      # +--------+-------+--------------------+
      
      #
      # 7. Run it as a stream with dropDuplicates, using a watermark
      #     THIS is where we see the error
      #
      sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes")
      sdf.createOrReplaceTempView("test_data")
      res = spark.sql(sql)
      res = (
          (res.select(window("ts", "30 minutes"), "*"))
          .withWatermark("window", '30 minutes')
          .dropDuplicates(["event_id"]).drop("window")
      )
      write_stream(res) 
      print("END")
      # -------------------------------------------                                  
      # Batch: 0
      # -------------------------------------------
      # +--------+-------+---+
      # |event_id|join_id| ts|
      # +--------+-------+---+
      # +--------+-------+---+
      # END

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            schenksj Scott Schenkein
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: