Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16361

Rack aware sticky assignor minQuota violations

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1, 3.7.0, 3.6.1
    • None
    • clients
    • None

    Description

      In some low topic replication scenarios the rack aware assignment in the StickyAssignor fails to balance consumers to its own expectations and throws an IllegalStateException, commonly crashing the application (depending on application implementation). While uncommon the error is deterministic, and so persists until the replication state changes. 

       

      We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely would also be reproducible there) 

       

      Here is the error and stack from our test case against 3.7.0

      We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned
      java.lang.IllegalStateException: We haven't reached the expected number of members with more than the minQuota partitions, but no more partitions to be assigned
          at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
          at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
          at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
          at org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) 

      Here is a specific test case from 3.7.0 that fails when passed to StickyAssignor.assign:

      Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 8, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 33, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 66, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 4, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 29, leader = 3, replicas = [3,1,2], isr = [3,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 62, leader = 3, replicas = [3,2,1], isr = [3,2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 95, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 0, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 25, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 58, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 91, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 21, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 54, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 87, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 17, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 50, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 83, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 13, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 46, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 79, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 9, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 42, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 75, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 5, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 38, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 71, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 1, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 34, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 67, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 30, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 63, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 26, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 59, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 92, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 22, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 55, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 88, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 18, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 51, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 84, leader = 4, replicas = [4,2,1], isr = [4,2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 14, leader = 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = topic_name, partition = 47, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 80, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 10, leader = 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 43, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 76, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 6, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 39, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 72, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 2, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 35, leader = 4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 68, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 93, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 31, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 64, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 89, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 27, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 60, leader = 4, replicas = [4,1,2], isr = [4,1,2], offlineReplicas = []), Partition(topic = topic_name, partition = 85, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 23, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 56, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 81, leader = 4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = topic_name, partition = 19, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 52, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 77, leader = 4, replicas = [4,1], isr = [4,1], offlineReplicas = []), Partition(topic = topic_name, partition = 15, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 48, leader = 3, replicas = [3,2], isr = [3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 73, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 11, leader = 3, replicas = [3,1], isr = [3,1], offlineReplicas = []), Partition(topic = topic_name, partition = 44, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 69, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 7, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = topic_name, partition = 40, leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, partition = 65, leader = 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = topic_name, partition = 3, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 36, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = topic_name, partition = 61, leader = 4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = topic_name, partition = 94, leader = 4, replicas = [4,3], isr = [4,3], offlineReplicas = []), Partition(topic = topic_name, partition = 32, leader = 2, replicas = [2,1], isr = [2,1], offlineReplicas = [])], controller = host-1:1 (id: 1 rack: rack-1))
       
      
      GroupSubscription(subscriptions={Consumer-12=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-8=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-10=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-7=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-11=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-9=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-0=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-2=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3), Consumer-1=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-4=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-2), Consumer-3=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-6=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-1), Consumer-5=Subscription(topics=[topic_name], userDataSize=4, ownedPartitions=[], groupInstanceId=null, generationId=-1, rackId=rack-3)}) 
      

      A more general reproduction we have performed showed ~1/500 failure rate using a 96 partition topic in a cluster with 4 nodes across 3 hosts on 3 racks being consumed by 13 consumers spread evenly across the 3 racks . Then randomly replicating each partition to 1-3 nodes (all fully in sync). The expectation here would be that any number of replicas >= 1 should be able to be correctly assigned out, even if not rack sympathetic in 100% of scenarios. 

       

      An actual assignment from one of the above sampled scenarios from 3.6.1 ended with these assignments:

      2 Consumers with 7 partitions

      3 Consumers with 6 partitions

      8 Consumers with 8 partitions

       

      So, at least on that version, the assignment seems to be over assigning some (3) consumers leaving some (3) consumers under-assigned and failing the minQuota check. 

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            luked Luke D
            Votes:
            2 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated: