Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6177

AfterProcessingTime not firing

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.8.0
    • None
    • sdk-py-core
    • None

    Description

      Hi,

       

      Documentation says that a AfterProcessingTime(X) trigger should fire X seconds after the first element is processed, but it appears that this trigger never fires when using a Global window on a steady influx of elements.

      Here is my pipeline:

       

      (p
      | 'pubsub' >> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
      | 'window' >> beam.WindowInto(
            window.GlobalWindows(),
            trigger=Repeatedly(AfterProcessingTime(5)),
            accumulation_mode=AccumulationMode.DISCARDING
            )
      | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
      | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
      | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
      | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, self._clickhouse_port,self._clickhouse_database)
      )
      

       

       

      I expect that every 5 seconds (as long as elements are pouring in), the trigger would fire and my data would be combined. The idea of this pipeline is simply to get messages from PubSub, transform them into ClickHouse ORM models and then batch save them into ClickHouse, using as much parallelism as possible - we do not care about order, etc... Elements can be inserted in any order and are not correlated to one another.

      The potential issue is in class AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in trigger.py:

       

      context.set_timer(
          '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
      

      This will basically override the previously set timer every time a new element comes in, and in the case of a constant influx of elements, the trigger only fires once we have no more elements for X seconds.

       

       

      Please let me know if I understood the documentation right, and if I can further help.

       

      Thanks you,

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            atellier Arnaud T
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: