Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15848

Design solution for inconsistency between ConsumerDelegate timeout policies

    XMLWordPrintableJSON

Details

    Description

      The two ConsumerDelegate implementations (LegacyKafkaConsumer and AsyncKafkaConsumer) have a fundamental difference related to their use and interpretation of the Timer that is supplied.

      tl;dr

      AsyncKafkaConsumer is very literal about the timeout, whereas LegacyKafkaConsumer seems to give a little wiggle room.

      LegacyKafkaConsumer is structured so that the logic it uses can check for success of its operations before checking the timer:

      1. Submit operation asynchronously
      2. Wait for operation to complete using NetworkClient.poll()
      3. Check for result
        1. If successful, return success
        2. If fatal failure, return failure
      4. Check timer
        1. If timer expired, return failure

      AsyncKafkaConsumer uses Future.get() to wait for its operations:

      1. Submit operation asynchronously
      2. Wait for operation to complete using Future.get()
        1. If operation timed out, Future.get() will throw a timeout error
      3. Check for result
        1. If successful, return success
        2. Otherwise, return failure

      How to reproduce

      This causes subtle timing issues, but they can be easily reproduced via any of the KafkaConsumerTest unit tests that invoke the consumer.poll(0) API. Here's a bit of code that illustrates the difference between the two approaches.

      LegacyKafkaConsumer performs a lot of its network I/O operations in a manner similar to this:

      public int getCount(Timer timer) {
          do {
              final RequestFuture<Integer> future = sendSomeRequest(partitions);
              client.poll(future, timer);
      
              if (future.isDone())
                  return future.get();
          } while (timer.notExpired());
      
          return -1;
      }
      

      AsyncKafkaConsumer has similar logic, but it is structured like this:

      private int getCount(Timer timer) {
          try {
              CompletableFuture<Integer> future = new CompleteableFuture<>();
              applicationEventQueue.add(future);
              return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
          } catch (TimeoutException e) {
              return -1;
          }
      }
      

      The call to add enqueues the network operation, but it then immediately invokes Future.get() with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it immediately throws a TimeoutException

      Suggested fix

      This task is to design and document the timeout policy for the new Consumer implementation.

      The documentation lives here: https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts

      Attachments

        Issue Links

          Activity

            People

              kirktrue Kirk True
              kirktrue Kirk True
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: