Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7514 Support streaming on the Python fn_api_runner
  3. BEAM-14127

Timers with same family ids in same stage (but different transforms) are buffered together

Details

    • Sub-task
    • Status: Open
    • P2
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core, sdk-py-harness

    Description

      The following test case does not work properly:

       

      def test_dynamic_timer_clear_then_set_timer(self):
      
        class EmitTwoEvents(DoFn):
          EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
      
          def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
            yield ('1', 'set')
            emit.set(1)
      
          @on_timer(EMIT_CLEAR_SET_TIMER)
          def emit_clear(self):
            yield ('1', 'clear')
      
        class DynamicTimerDoFn(DoFn):
          EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
      
          def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
            if element[1] == 'set':
              emit.set(10, dynamic_timer_tag='emit1')
              emit.set(20, dynamic_timer_tag='emit2')
            if element[1] == 'clear':
              emit.set(30, dynamic_timer_tag='emit3')
              emit.clear(dynamic_timer_tag='emit3')
              emit.set(40, dynamic_timer_tag='emit3')
            return []
      
          @on_timer(EMIT_TIMER_FAMILY)
          def emit_callback(
              self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
            yield (tag, ts)
      
        with TestPipeline() as p:
          res = (
              p
              | beam.Create([('1', 'impulse')])
              | beam.ParDo(EmitTwoEvents())
              | beam.ParDo(DynamicTimerDoFn()))
          assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) 

      Attachments

        Activity

          People

            Unassigned Unassigned
            pabloem Pablo Estrada
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: