Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16312

ConsumerRebalanceListener.onPartitionsAssigned() should be called after joining, even if empty

    XMLWordPrintableJSON

Details

    Description

      There is a difference between the LegacyKafkaConsumer and AsyncKafkaConsumer respecting when the ConsumerRebalanceListener.onPartitionsAssigned() method is invoked.

      For example, with onPartitionsAssigned():

      • LegacyKafkaConsumer: the listener method is invoked when the consumer joins the group, even if that consumer was not assigned any partitions. In this case it's passed an empty list.
      • AsyncKafkaConsumer: the listener method is only invoked after the consumer joins the group iff it has assigned partitions

      This difference is affecting the system tests. The system tests use a Java class named VerifiableConsumer which uses a ConsumerRebalanceListener that logs when the callbacks are invoked. The system tests then read from that log to determine when the callbacks are invoked. This coordination is used by the system tests to determine the lifecycle and status of the consumers.

      The system tests rely heavily on the listener behavior of the LegacyKafkaConsumer. It invokes the onPartitionsAssigned() method when the consumer joins the group, and the system tests use that to determine when the consumer is actively a member of the group. This validation of membership is used as an assertion throughout the consumer-related tests.

      In the system test I'm executing from consumer_test.py, there's a test that creates three consumers to read from a single topic with a single partition. It's a bit of an oddball test, but it demonstrates the issue.

      Here are the logs pulled from the test run when executed using the LegacyKafkaConsumer:

      Node 1:

      [2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker)

      Node 2:

      [2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker)

      Node 3:

      [2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker)

      Here are the logs when executing the same test using the AsyncKafkaConsumer:

      Node 1:

      [2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker)

      Node 2:

      n/a

      Node 3:

      n/a

      As a result of this change, the existing system tests do not work with the new consumer. However, even more importantly, this change in behavior may adversely affect existing users.

      Attachments

        Issue Links

          Activity

            People

              lucasbru Lucas Brutschy
              kirktrue Kirk True
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: