Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
2.3.1
-
None
-
HDP --> 3.0.0
Spark --> 2.3.1
Kafka --> 2.1.1
Description
I use structure-streaming to consume Kafka data, Trigger Type is default and checkpoint is enabled, but looking at the log, I find the structure-streaming data before processing, the application log is as follows:
19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}), end = \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}} 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map() 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's offset was changed from 13978260 to 9053058, some data may have been missed.^ Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true".
I see that when you get the latestOffsets they are compared with the committedOffsets to see if they are newData.
private def dataAvailable: Boolean = { availableOffsets.exists { case (source, available) => committedOffsets.get(source).map(committed => committed != available).getOrElse(true) } }
I think it is Kafka appeared what problem, cause the fetchLatestOffsets methods returned earliestOffsets. However, the data was successfully processed and committed. Whether or not it can be determined in the dataAvailable method, if availableOffsets has been committed, the batch will no longer be marked as newData.
I don't know what I think is correct, if continue processing earliestOffsets, then the structured-streaming can't timely corresponding, I'm glad to receive any suggestion!
Attachments
Issue Links
- duplicates
-
SPARK-26267 Kafka source may reprocess data
- Resolved