Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.0
-
None
-
None
Description
When applying a streaming dropDuplicates-with-watermark to a self-referential stream-stream left-join with watermark, the dropDuplicates drops all rows. If the watermark is eliminated from the dropDuplicates, the query behaves as expected.
The code below demonstrates the error:
from pyspark.sql.functions import window, lit, col, current_timestamp # Added on 4/30 in response to difficulty reproducing # # 1. Generate the test data # size = 1000 step_secs = 300 event_offset_secs = 240 # Add some lines to not left join skips = set() skips.add(3) skips.add(18) skips.add(800) def lit_ts(secs): return datetime.datetime.fromtimestamp(secs) data = [] base_time = time.time() for x in range(size): ts = base_time + (step_secs * x) data.append({"event_id": f"one_{x}", "join_id": x, "ts": lit_ts(ts)}) if x not in skips: data.append( { "event_id": f"two_{x}", "join_id": x, "ts": lit_ts(ts - event_offset_secs), } ) # Add duplicates to validate the dropDuplicates for i in range(len(data)): data.append(data[i].copy()) # # 2. Write the results so we can stream # path = "/tmp/bugtest" df = spark.createDataFrame(data, schema="event_id string, join_id string, ts TIMESTAMP") df.repartition(1).write.format("delta").mode("overwrite").save(path) df = spark.read.format("delta").load(path) df.createOrReplaceTempView("test_data") # # 3. Define the test query # sql = """ with also_test_data as ( select * from test_data where event_id like 'two%' ) select td.* from test_data td left join also_test_data on ( td.join_id = also_test_data.join_id and also_test_data.ts >= td.ts - INTERVAL 15 MINUTES and also_test_data.ts <= td.ts ) where td.event_id like 'one%' -- rows where left-join does not match and also_test_data.event_id is NULL """ # # 4. Run it non-streaming w/non-deDuplicated to validate results # res = spark.sql(sql) print("Static query") res.show(truncate=False) # Static query # +--------+-------+-------------------------+ # |event_id|join_id|ts | # +--------+-------+-------------------------+ # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # |one_3 |3 |2024-04-27 17:36:54.97927| # |one_18 |18 |2024-04-27 18:51:54.97927| # |one_800 |800 |2024-04-30 12:01:54.97927| # +--------+-------+-------------------------+ # # 5. Run it as a stream with no-dropDuplicates # def write_stream(res): ( res.writeStream.outputMode("append") .trigger(availableNow=True) .format("console") .start() .awaitTermination() ) sdf = spark.readStream.format("delta").load(path).withWatermark('ts', '15 minutes') sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) write_stream(res) # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_800| 800|2024-04-30 12:01:...| # | one_3| 3|2024-04-27 17:36:...| # | one_3| 3|2024-04-27 17:36:...| # | one_18| 18|2024-04-27 18:51:...| # | one_18| 18|2024-04-27 18:51:...| # +--------+-------+--------------------+ # # 6. Run it as a stream with dropDuplicates, but no extra watermark # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) # ------------------------------------------- # Batch: 1 # ------------------------------------------- # +--------+-------+--------------------+ # |event_id|join_id| ts| # +--------+-------+--------------------+ # | one_800| 800|2024-04-30 12:01:...| # | one_18| 18|2024-04-27 18:51:...| # | one_3| 3|2024-04-27 17:36:...| # +--------+-------+--------------------+ # # 7. Run it as a stream with dropDuplicates, using a watermark # THIS is where we see the error # sdf = spark.readStream.format("delta").load(path).withWatermark("ts", "15 minutes") sdf.createOrReplaceTempView("test_data") res = spark.sql(sql) res = ( (res.select(window("ts", "30 minutes"), "*")) .withWatermark("window", '30 minutes') .dropDuplicates(["event_id"]).drop("window") ) write_stream(res) print("END") # ------------------------------------------- # Batch: 0 # ------------------------------------------- # +--------+-------+---+ # |event_id|join_id| ts| # +--------+-------+---+ # +--------+-------+---+ # END