Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.1, 3.7.0, 3.6.1
-
None
-
None
Description
In some low topic replication scenarios the rack aware assignment in the StickyAssignor fails to balance consumers to its own expectations and throws an IllegalStateException, commonly crashing the application (depending on application implementation). While uncommon the error is deterministic, and so persists until the replication state changes.
We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely would also be reproducible there)
Here is the error and stack from our test case against 3.7.0
We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned java.lang.IllegalStateException: We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) at org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
Here is a specific test case from 3.7.0 that fails when passed to StickyAssignor.assign:
Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 66, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 4, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 29, leader = 3, replicas = [3,1,2], isr = [3,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 62, leader = 3, replicas = [3,2,1], isr = [3,2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 95, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 0, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 25, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 58, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 91, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 21, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 54, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 87, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 17, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 50, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 83, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 13, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 46, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 79, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 9, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 42, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 75, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 5, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 38, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 71, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 1, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 34, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 67, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 30, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 63, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 26, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 59, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 92, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 22, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 55, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 88, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 18, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 51, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 84, leader = 4, replicas = [4,2,1], isr = [4,2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 14, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 47, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 80, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 10, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 43, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 76, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 6, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 39, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 72, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 2, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 35, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 68, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 93, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 31, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 64, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 89, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 27, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 60, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 85, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 23, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 56, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 81, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 19, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 52, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 77, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 15, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 48, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 73, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 11, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 44, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 69, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 7, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 40, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 65, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 3, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 36, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 61, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 94, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 32, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = [])], controller = host-1:1 (id: 1 rack: rack-1)) GroupSubscription(subscriptions={Consumer-12=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-8=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-10=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-7=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-11=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-9=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-0=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-2=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-1=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-4=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-3=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-6=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-5=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3)})
A more general reproduction we have performed showed ~1/500 failure rate using a 96 partition topic in a cluster with 4 nodes across 3 hosts on 3 racks being consumed by 13 consumers spread evenly across the 3 racks . Then randomly replicating each partition to 1-3 nodes (all fully in sync). The expectation here would be that any number of replicas >= 1 should be able to be correctly assigned out, even if not rack sympathetic in 100% of scenarios.
An actual assignment from one of the above sampled scenarios from 3.6.1 ended with these assignments:
2 Consumers with 7 partitions
3 Consumers with 6 partitions
8 Consumers with 8 partitions
So, at least on that version, the assignment seems to be over assigning some (3) consumers leaving some (3) consumers under-assigned and failing the minQuota check.