Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-32044

[SS] 2.4 Kafka continuous processing print mislead initial offsets log

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Trivial
    • Resolution: Fixed
    • 2.4.6
    • 2.4.7
    • Structured Streaming
    • None

    Description

      When using structured streaming in continuous processing mode, after restart spark job, spark job can correctly pick up offsets in checkpoint location from last epoch. But it always print out below log:

      20/06/12 00:58:09 INFO [stream execution thread for [id = 34e5b909-f9fe-422a-89c0-081251a68693, runId = 0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader: Initial offsets: {"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}}

      This log is misleading as spark didn't use this one as initial offsets. Also, it results in unnecessary kafka offset fetch. This is caused by below code in KafkaContinuousReader

      offset = start.orElse {
        val offsets = initialOffsets match {
          case EarliestOffsetRangeLimit =>
            KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
          case LatestOffsetRangeLimit =>
            KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
          case SpecificOffsetRangeLimit(p) =>
            offsetReader.fetchSpecificOffsets(p, reportDataLoss)
        }
        logInfo(s"Initial offsets: $offsets")
        offsets
      }
      

       The code inside orElse block is always executed even when start has value.

       

      Attachments

        Activity

          People

            warrenzhu25 Zhongwei Zhu
            warrenzhu25 Zhongwei Zhu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 24h
                24h
                Remaining:
                Remaining Estimate - 24h
                24h
                Logged:
                Time Spent - Not Specified
                Not Specified