Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16514

Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.7.0
    • None
    • streams
    • None

    Description

      Working with Kafka Streams 3.7.0, but may affect earlier versions as well.

      When attempting to shutdown a streams application and leave the associated consumer group, the supplied `leaveGroup` option seems to have no effect. Sample code:

      CloseOptions options = new CloseOptions().leaveGroup(true);
      stream.close(options);

      The expected behavior here is that the group member would shutdown and leave the group, immediately triggering a consumer group rebalance. In practice, the rebalance happens after the appropriate timeout configuration has expired.

      I understand the default behavior in that there is an assumption that any associated StateStores would be persisted to disk and that in the case of a rolling restart/deployment, the rebalance delay may be preferable. However, in our application we are using in-memory state stores and standby replicas. There is no benefit in delaying the rebalance in this setup and we are in need of a way to force a member to leave the group when shutting down.

      The workaround we found is to set an undocumented internal StreamConfig to enforce this behavior:

      props.put("internal.leave.group.on.close", true);
      

      To state the obvious, this is less than ideal.

      Additional configuration details:

      Properties props = new Properties();
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
      props.put(
              StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
              "localhost:9092,localhost:9093,localhost:9094");
      props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
      props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
      props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
      props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sal.sorrentino Sal Sorrentino
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated: