Details
-
Question
-
Status: Closed
-
Trivial
-
Resolution: Invalid
-
2.4.3
-
None
-
None
-
Spark 2.4.3 (scala 2.11.12)
Delta: 0.5.0
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
OS: Ubuntu 18.04 LTS
Description
Hi ,
I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table and faced strange behaviour:
1. I've noticed that logs inside implementation of MapGroupsWithStateFunction/ FlatMapGroupsWithStateFunction in my application outputted twice.
2. While finding a root cause I've also found that number State rows reported by Spark is also doubles.
I thought that may be there's a bug in my code, so I back to JavaStructuredSessionization from Apache Spark examples and changed it a bit. Still got same result.
The problem happens only if I do not perform datch.DF.persist inside foreachBatch.
StreamingQuery query = sessionUpdates .writeStream() .outputMode("update") .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, v2) -> { // following doubles number of spark state rows and causes MapGroupsWithStateFunction to log twice withport persisting deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), mergeExpr) .whenNotMatched().insertAll() .whenMatched() .updateAll() .execute(); }) .trigger(Trigger.ProcessingTime(10000)) .queryName("ACME") .start();
According to https://docs.databricks.com/_static/notebooks/merge-in-streaming.html and Apache spark docs there's seems to be no need to persist dataset/dataframe inside foreachBatch.
Sample code from Apache Spark examples with delta: JavaStructuredSessionization with Delta merge
Appreciate your clarification.