Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16556

SubscriptionState should not prematurely reset 'pending' partitions

    XMLWordPrintableJSON

Details

    Description

      There appears to be a race condition between invoking the ConsumerRebalanceListener callbacks on reconciliation and initWithCommittedOffsetsIfNeeded in the consumer.
       
      The membership manager adds the newly assigned partitions to the SubscriptionState, but marks them as pendingOnAssignedCallback. Then, after the ConsumerRebalanceListener.onPartitionsAssigned() completes, the membership manager will invoke enablePartitionsAwaitingCallback to set all of those partitions' 'pending' flag to false.
       
      During the main Consumer.poll() loop, AsyncKafkaConsumer may need to call initWithCommittedOffsetsIfNeeded() if the positions aren't already cached. Inside initWithCommittedOffsetsIfNeeded, the consumer calls the subscription's initializingPartitions method to get a set of the partitions for which to fetch their committed offsets. However, SubscriptionState.initializingPartitions() only returns partitions that have the pendingOnAssignedCallback flag set to to false.
       
      The result is:

      • If the MembershipManagerImpl.assignPartitions() future  is completed on the background thread first, the 'pending' flag is set to false. On the application thread, when SubscriptionState.initializingPartitions() is called, it returns the partition, and we fetch its committed offsets
      • If instead the application thread calls SubscriptionState.initializingPartitions() first, the partitions's 'pending' flag is still set to false, and so the partition is omitted from the returned set. The updateFetchPositions() method then continues on and re-initializes the partition's fetch offset to 0.

      Attachments

        Issue Links

          Activity

            People

              kirktrue Kirk True
              kirktrue Kirk True
              Lucas Brutschy Lucas Brutschy
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: