Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.0
-
None
Description
Spark structured Streaming -Kafka, ListOffsetRequest to earliest(-2) for existing partition:
Issue identified as spark has been sending ListOffsetRequest for existing partition as "earliest(-2)" instead of latest(-1). This has been occuring all of sudden after couple of hours application got started.
As per the documentaion, spark should request as earliest for only new partitions but in this request with earliest is being send to existing particular partition. Due to this, offsets are being set to LSO(Stable offset) and duplicate data is being consumed.
Below are the logs at consumer in DEBUG mode. Duplicates have flown from partition-20 and at this point, below 2 lines are bit different from normal logs. Also no error message in logs.This behaviour looks peculiar .
29/03/20 18:27:53 DEBUG internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-bca38026-e65e-4e24-8d86-aaa-driver-0]
Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={Hid1328.gnpp-raw-20=-2}, isolationLevel=READ_UNCOMMITTED) to broker famescpolyfi5.teliacompany.net:9095 (id: 5 rack: null)
29/03/20 18:27:53 DEBUG internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-bca38026-e65e-4e24-8d86-9daf0d34b406--1757490130-driver-0] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={Hid1328.gnpp-raw-5=-1, Hid1328.gnpp-raw-20=-1, Hid1328.gnpp-raw-25=-1, Hid1328.gnpp-raw-10=-1, Hid1328.gnpp-raw-15=-1, Hid1328.gnpp-raw-0=-1}, isolationLevel=READ_UNCOMMITTED) to broker famescpolyfi5.teliacompany.net:9095 (id: 5 rack: null)
Attachments
Issue Links
- is duplicated by
-
SPARK-29709 structured streaming The offset in the checkpoint is suddenly reset to the earliest
- Resolved