Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7499

ReifyTest.test_window fails in DirectRunner due to 'assign_context.window should not be None.'

Details

    • Improvement
    • Status: Resolved
    • P3
    • Resolution: Fixed
    • None
    • 2.15.0
    • sdk-py-core, test-failures
    • None

    Description

       

      PR 8717 added ReifyWindow.test_window which fails on the DirectRunner.

      ERROR:root:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f087ab31248>, due to an exception.
       Traceback (most recent call last):
       File "apache_beam/runners/direct/executor.py", line 343, in call
       finish_state)
       File "apache_beam/runners/direct/executor.py", line 380, in attempt_call
       evaluator.process_element(value)
       File "apache_beam/runners/direct/transform_evaluator.py", line 636, in process_element
       self.runner.process(element)
       File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
       def process(self, windowed_value):
       File "apache_beam/runners/common.py", line 784, in apache_beam.runners.common.DoFnRunner.process
       self._reraise_augmented(exn)
       File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.DoFnRunner._reraise_augmented
       raise_with_traceback(new_exn)
       File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
       return self.do_fn_invoker.invoke_process(windowed_value)
       File "apache_beam/runners/common.py", line 453, in apache_beam.runners.common.SimpleInvoker.invoke_process
       output_processor.process_outputs(
       File "apache_beam/runners/common.py", line 915, in apache_beam.runners.common._OutputProcessor.process_outputs
       self.window_fn.assign(assign_context))
       File "apache_beam/transforms/util.py", line 557, in assign
       'assign_context.window should not be None. '
      ValueError: assign_context.window should not be None. This might be due to a DoFn returning a TimestampedValue. [while running 'add_timestamps2']
      Traceback (most recent call last):
       File "apache_beam/transforms/util_test.py", line 501, in test_window
       assert_that(reified_pc, equal_to(expected), reify_windows=True)
       File "apache_beam/pipeline.py", line 426, in __exit__
       self.run().wait_until_finish()
       File "apache_beam/testing/test_pipeline.py", line 109, in run
       state = result.wait_until_finish()
       File "apache_beam/runners/direct/direct_runner.py", line 430, in wait_until_finish
       self._executor.await_completion()
       File "apache_beam/runners/direct/executor.py", line 400, in await_completion
       self._executor.await_completion()
       File "apache_beam/runners/direct/executor.py", line 446, in await_completion
       raise_(t, v, tb)
       File "apache_beam/runners/direct/executor.py", line 343, in call
       finish_state)
       File "apache_beam/runners/direct/executor.py", line 380, in attempt_call
       evaluator.process_element(value)
       File "apache_beam/runners/direct/transform_evaluator.py", line 636, in process_element
       self.runner.process(element)
       File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
       def process(self, windowed_value):
       File "apache_beam/runners/common.py", line 784, in apache_beam.runners.common.DoFnRunner.process
       self._reraise_augmented(exn)
       File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.DoFnRunner._reraise_augmented
       raise_with_traceback(new_exn)
       File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
       return self.do_fn_invoker.invoke_process(windowed_value)
       File "apache_beam/runners/common.py", line 454, in apache_beam.runners.common.SimpleInvoker.invoke_process
       windowed_value, self.process_method(windowed_value.value))
       File "apache_beam/transforms/core.py", line 1292, in <lambda>
       wrapper = lambda x: [fn(x)]
       File "apache_beam/testing/util.py", line 129, in _equal
       'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
      BeamAssertException: Failed assert: [TestWindowedValue(value=('a', 100, GlobalWindow), timestamp=100, windows=[GlobalWindow]), TestWindowedValue(value=('b', 200, GlobalWindow), timestamp=200, windows=[GlobalWindow]), TestWindowedValue(value=('c', 300, GlobalWindow), timestamp=300, windows=[GlobalWindow])] == [TestWindowedValue(value=(('a', 100.0, (GlobalWindow,), PaneInfo(first: True, last: True, timing: 3, index: 0, nonspeculative_index: 0)), Timestamp(-9223372036854.775000), GlobalWindow), timestamp=Timestamp(-9223372036854.775000), windows=[GlobalWindow]), TestWindowedValue(value=(('c', 300.0, (GlobalWindow,), PaneInfo(first: True, last: True, timing: 3, index: 0, nonspeculative_index: 0)), Timestamp(-9223372036854.775000), GlobalWindow), timestamp=Timestamp(-9223372036854.775000), windows=[GlobalWindow]), TestWindowedValue(value=(('b', 200.0, (GlobalWindow,), PaneInfo(first: True, last: True, timing: 3, index: 0, nonspeculative_index: 0)), Timestamp(-9223372036854.775000), GlobalWindow), timestamp=Timestamp(-9223372036854.775000), windows=[GlobalWindow])] [while running 'assert_that/Match']
      

       

      Attachments

        Activity

          People

            pabloem Pablo Estrada
            lcwik Luke Cwik
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h 50m
                3h 50m