Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
This issue is surfaced by the errors from https://github.com/apache/beam/pull/9077
Specifically, in the following pipelines, note how Map(lambda x: x) is necessary before the actual transform that is being tested. This is due to the fact that the pcoll output by create is a little odd.
// code placeholder def test_timestamp(self): l = [TimestampedValue('a', 100), TimestampedValue('b', 200), TimestampedValue('c', 300)] expected = [TestWindowedValue('a', 100, [GlobalWindow()]), TestWindowedValue('b', 200, [GlobalWindow()]), TestWindowedValue('c', 300, [GlobalWindow()])] with TestPipeline() as p: # Map(lambda x: x) PTransform is added after Create here, because when # a PCollection of TimestampedValues is created with Create PTransform, # the timestamps are not assigned to it. Adding a Map forces the # PCollection to go through a DoFn so that the PCollection consists of # the elements with timestamps assigned to them instead of a PCollection # of TimestampedValue(element, timestamp). pc = p | beam.Create(l) | beam.Map(lambda x: x) reified_pc = pc | util.Reify.Timestamp() assert_that(reified_pc, equal_to(expected), reify_windows=True) def test_window(self): l = [GlobalWindows.windowed_value('a', 100), GlobalWindows.windowed_value('b', 200), GlobalWindows.windowed_value('c', 300)] expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100, [GlobalWindow()]), TestWindowedValue(('b', 200, GlobalWindow()), 200, [GlobalWindow()]), TestWindowedValue(('c', 300, GlobalWindow()), 300, [GlobalWindow()])] with TestPipeline() as p: pc = p | beam.Create(l) # Map(lambda x: x) PTransform is added after Create here, because when # a PCollection of WindowedValues is created with Create PTransform, # the windows are not assigned to it. Adding a Map forces the # PCollection to go through a DoFn so that the PCollection consists of # the elements with timestamps assigned to them instead of a PCollection # of WindowedValue(element, timestamp, window). pc = pc | beam.Map(lambda x: x) reified_pc = pc | util.Reify.Window() assert_that(reified_pc, equal_to(expected), reify_windows=True)