Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28641

MicroBatchExecution committed offsets greater than available offsets

    XMLWordPrintableJSON

Details

    Description

      I use structure-streaming to consume Kafka data, Trigger Type is default and checkpoint is enabled, but looking at the log, I find the structure-streaming data before processing, the application log is as follows:
       

      19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}), end = \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
      19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
      19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's offset was changed from 13978260 to 9053058, some data may have been missed.^
      Some data may have been lost because they are not available in Kafka any more; either the
       data was aged out by Kafka or the topic may have been deleted before all the data in the
       topic was processed. If you want your streaming query to fail on such cases, set the source
       option "failOnDataLoss" to "true".
      

       

      I see that when you get the latestOffsets they are compared with the committedOffsets to see if they are newData.

       

      private def dataAvailable: Boolean = {
        availableOffsets.exists {
          case (source, available) =>
            committedOffsets.get(source).map(committed => committed != available).getOrElse(true)
         }
       }
      

       

      I think it is Kafka appeared what problem, cause the fetchLatestOffsets methods returned earliestOffsets. However, the data was successfully processed and committed. Whether or not it can be determined in the dataAvailable method, if availableOffsets has been committed, the batch will no longer be marked as newData.

      I don't know what I think is correct, if continue processing earliestOffsets, then the structured-streaming can't timely corresponding, I'm glad to receive any suggestion!

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              MariaCarrie MariaCarrie
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 48h
                  48h
                  Remaining:
                  Remaining Estimate - 48h
                  48h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified