Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7639

Intermittent empty accumulator values in extractOutput of Combine.perKey on Dataflow

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.10.0
    • None
    • runner-dataflow
    • None

    Description

      We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on Google Dataflow in streaming mode with fixed windows.

      Using the above versions we have observed a strange and unfortunately intermittent behaviour with Combine.perKey transform used to achieve a reduce operation, e.g. emitting the max value per key or the value based on the last element with the key in window.

      Such reductions are implemented in scio as Combine.CombineFn with the accumulator created as an empty ArrayList and extractOutput doing the actual reduction and returning the output value.

      This works well when at trigger time combine accumulator is non empty and I understand that there should be no triggers fired if there are no input messages processed in the given window for a given key. Otherwise if it is fired I think we may assume there was at least one event with a given key in a given window and it should be in accumulator.

      The transform is part of a job consisting of 40-50 transforms that is consuming messages from two different PubSub topics, transforming, windowing, combining them and then joining to emit output messages to a PubSub topic. Messages in input topics are pulled at 5-300 per second rate depending on a time of day.

      We did run this job split into 3 separate jobs for 6+ months and observed no similar problems but it was not optimal as each of those jobs were using 10-30% of worker CPU. It is after we combined those separate jobs into one we have started observing exceptions in the step where the specific transform was used and for which the direct cause is an empty accumulator at the time when window triggers are fired. Those errors happened on subscriptions that had 1 hour retention set and the CPUs were quite stressed then.

      We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of retention was consumed without errors. After another try with retention of 3 hours that was successful we tried consuming 6 hours of retention which then again failed.

      We have found similar issues at scio's bugtracker:

      https://github.com/spotify/scio/issues/778

      https://github.com/spotify/scio/issues/1620

      The workaround proposed there is to use a custom `aggregateByKey` transform which is also based on Combine.perKey but uses a `zero` value which is output when the accumulator is empty. We used this workaround but it is not optimal as there are some cases that there is no good default value, e.g. last/first message in window.

      While searching through Beam's jira I have found an issue that may be similar to ours: https://issues.apache.org/jira/browse/BEAM-7614

      I assume that this issues happen when the CPU, memory or both are stressed in a catch up phase after starting a job with some retention to consume.

      Attachments

        Activity

          People

            Unassigned Unassigned
            piter Piotr Szczepanik
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: