Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4845

KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 0.10.1.0, 0.10.1.1, 0.10.2.0
    • None
    • clients
    • None

    Description

      When integrating with spark streaming, kafka consumer cannot get the latest offsets except for one partition. The code snippet is as follows:

      protected def latestOffsets(): Map[TopicPartition, Long] = {
          val c = consumer
          c.poll(0)
          val parts = c.assignment().asScala
          val newPartitions = parts.diff(currentOffsets.keySet)
          currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
          c.pause(newPartitions.asJava)
          c.seekToEnd(currentOffsets.keySet.asJava)
          parts.map(tp => tp -> c.position(tp)).toMap
        }
      

      When calling consumer.position(topicPartition), it will call updateFetchPositions(Collections.singleton(partition)):

      The bug lies in updateFetchPositions(Set<TopicPartition> partitions):

              fetcher.resetOffsetsIfNeeded(partitions);    // reset to latest offset for current partition
              if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for all partitions before, so this sentence will be true 
                  coordinator.refreshCommittedOffsetsIfNeeded();
                  fetcher.updateFetchPositions(partitions);  // reset to committed offsets for current partition
              }
      

      So eventually there is only one partition(the last partition in assignment) can get latest offset while all the others get the committed offset.

      Attachments

        Issue Links

          Activity

            People

              vahid Vahid Hashemian
              DanC Dan
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: