Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14048 The Next Generation of the Consumer Rebalance Protocol
  3. KAFKA-16194

KafkaConsumer.groupMetadata() should be correct when first records are returned

    XMLWordPrintableJSON

Details

    Description

      The following code returns records before the group metadata is updated. This fails the first transactions ever run by the Producer/Consumer.

       

      Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps);
      Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
      
      txnProducer.initTransactions();
      System.out.println("Init transactions called");
      
      try {
          txnProducer.beginTransaction();
          System.out.println("Begin transactions called");
      
          consumer.subscribe(Collections.singletonList("input"));
          System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
      
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
          System.out.println("Returned " + records.count() + " records.");
      
          // Process and send txn messages.
          for (ConsumerRecord<String, String> processedRecord : records) {
              txnProducer.send(new ProducerRecord<>("output", processedRecord.key(), "Processed: " + processedRecord.value()));
          }
      
          ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
          System.out.println("Group metadata inside test" + groupMetadata);
      
          Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
          for (ConsumerRecord<String, String> record : records) {
              offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
                  new OffsetAndMetadata(record.offset() + 1));
          }
          System.out.println("Offsets to commit" + offsetsToCommit);
          // Send offsets to transaction with ConsumerGroupMetadata.
          txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
          System.out.println("Send offsets to transaction done");
      
          // Commit the transaction.
          txnProducer.commitTransaction();
          System.out.println("Commit transaction done");
      } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
          e.printStackTrace();
          txnProducer.close();
      } catch (KafkaException e) {
          e.printStackTrace();
          txnProducer.abortTransaction();
      } finally {
          txnProducer.close();
          consumer.close();
      } 

      The issue seems to be that while it waits in `poll`, the event to update the group metadata is not processed.

      Attachments

        Issue Links

          Activity

            People

              cadonna Bruno Cadonna
              dajac David Jacot
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: