Description
Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
When attempting to shutdown a streams application and leave the associated consumer group, the supplied `leaveGroup` option seems to have no effect. Sample code:
CloseOptions options = new CloseOptions().leaveGroup(true); stream.close(options);
The expected behavior here is that the group member would shutdown and leave the group, immediately triggering a consumer group rebalance. In practice, the rebalance happens after the appropriate timeout configuration has expired.
I understand the default behavior in that there is an assumption that any associated StateStores would be persisted to disk and that in the case of a rolling restart/deployment, the rebalance delay may be preferable. However, in our application we are using in-memory state stores and standby replicas. There is no benefit in delaying the rebalance in this setup and we are in need of a way to force a member to leave the group when shutting down.
The workaround we found is to set an undocumented internal StreamConfig to enforce this behavior:
props.put("internal.leave.group.on.close", true);
To state the obvious, this is less than ideal.
Additional configuration details:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); props.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
Attachments
Issue Links
- links to