Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3333

KafkaSource will reprocess the same events through the Interceptor chain multiple times, if anything throws an Exception during doProcess

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.8.0, 1.9.0
    • None
    • Sinks+Sources
    • None
    • Observed on Apache Flume 1.8.0, java 8 (various), Apache Kafka (1.1.1)

      (But I think it's clear it's not an Environment issue.)

    Description

      A batch of events, of size batchSize, read from a Kafka topic was observed to have been processed multiple times (in our case, over 400..) by the configured Interceptors, before finally being completed and written into the Channel.

      Upon reading of the KafkaSource code in 1.8 (the version that was running) and version 1.9 (the latest release version), it was noticed that:

      1. There is a try/catch Exception around most of the processing loop in doProcess()
      2. eventList, the list of inbound events that can be mutated by ChannelProcessor processEventBatch() or the configured Interceptors, is cleared after processing completes successfully and before the Kafka Consumer offsets are committed.
      1. If an Exception is thrown somewhere during the loop, before eventList.clear(), then eventList is not cleared and will have the same contents on the next call to doProcess()
      2. No more records will be read from Kafka, because the eventList already contains batchUpperLimit (set from config "batchSize") records.
      3. The same eventList contents, already potentially mutated, will then be passed through the ChannelProcessor again.

      Some possible ways to fix:

      • Don't re-use the eventList across invocations to doProcess()
      • If eventList is declared this way to allow object re-use then just clear the eventList when an Exception is thrown: If the KafkaConsumer offsets aren't committed because of a failure, or even just because there is a ChannelException raised by processEventBatch(), then it isn't valid to keep its contents around once it may have been changed (I think they will/should be provided on a subsequent poll()? But I''m not 100% clear on what the Kafka Consumer APIs guarantee, here)
      • Don't allow processEventBatch() to mutate the List<Event> events, or processEvent() to mutate the Event object. NB. many (all?) of the core Interceptor implementations mutate the individual Event objects too, even if they sometimes build a new List.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jrg James Grinter
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: