Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-3

BrokerProxy deadlocks if messages aren't polled from all streams

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • kafka
    • None

    Description

      Suppose a KafkaSystemConsumer is created with:

      consumer.register(sp1, 0) 
      consumer.register(sp2, 0) 
      consumer.start 
      while(true) { 
        consumer.poll(sp2, 0) 
      } 
      

      This code will eventually dead lock (assuming sp1 has messages) if sp1 and sp2 are both on the same broker.

      The current implementation of BrokerProxy/KafkaSystemConsumer puts messages into BlockingEnvelopeMap. The BlockingEnvelopeMap has a per-SystemStreamPartition max queueSize (defaults to 1000, I believe). This means that, if a SystemStreamPartition is registered with the KafkaSystemConsumer, but messages are not read off of the SystemStreamPartition's queue for some reason, the BrokerProxy/KafkaSystemConsumer will eventually block on BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching any more messages from ANY topic/partitions on the broker. If code is trying to read messages from another SystemStreamPartition, it will not ever receive any new messages.

      This is not currently a problem because Samza reads in messages in two ways:

      1) During state store restore.
      2) During process loop (feeding messages to StreamTask.process).

      The current SamzaContainer implementation uses a new SystemConsumer for each SystemStreamPartition when it restores (#1), which registers ONLY one SystemStreamPartition, so no deadlock is possible here. The current DefaultChooser round robins between streams, which means that you will always poll from all streams with messages available in a round-robin fashion, so no starvation is currently possible (which means that no deadlock is possible).

      Nevertheless, this should be fixed. For one thing, if we change the DefaultChooser's behavior, this problem would surface.

      The simplest solution would be to stop fetching messages in the BrokerProxy for queues that are full. An alternative would be to stop fetching messages for any queue that has messages already in it (regardless of whether it's "full" or not).

      One nuance to the stop-fetching-on-queue-full solution is that FetchRequest takes a fetchSize, which is in bytes. This means that we might get more messages back in one FetchRequest than would fit into the BlockingEnvelopeMap queue. We could drop these excess messages, and re-fetch them again later.

      I think the best solution is just:

      1) Stop fetching messages for any queue that's not empty.
      2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded LinkedBlockingQueue.

      The new memory semantics for the KafkaSystemConsumer would be that the LinkedBlockingQueue would hold up to N elements where N is the max number of elements that can theoretically be returned in a single FetchRequest for a given TopicAndPartition. Thus, if a KafkaSystemConsumer's fetchSize were 1 megabyte, and 1 meg could return a theoretical maximum of 1 million messages (1 byte per message), then the maximum number of messages you'd expect to see in any single unbounded LinkedBlockingQueue would be 1 million. Once this queue was drained to zero, a new fetch would be issued.

      Attachments

        1. SAMZA-3.0.patch
          14 kB
          Chris Riccomini

        Issue Links

          Activity

            People

              criccomini Chris Riccomini
              criccomini Chris Riccomini
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: