Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7759

beam.Create([...]) does not create an properly shaped PCollection until pipeline executes

Details

    • Improvement
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None

    Description

      This issue is surfaced by the errors from https://github.com/apache/beam/pull/9077

       

      Specifically, in the following pipelines, note how Map(lambda x: x) is necessary before the actual transform that is being tested. This is due to the fact that the pcoll output by create is a little odd.

      // code placeholder
        def test_timestamp(self):
          l = [TimestampedValue('a', 100),
               TimestampedValue('b', 200),
               TimestampedValue('c', 300)]
          expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
                      TestWindowedValue('b', 200, [GlobalWindow()]),
                      TestWindowedValue('c', 300, [GlobalWindow()])]
          with TestPipeline() as p:
            # Map(lambda x: x) PTransform is added after Create here, because when
            # a PCollection of TimestampedValues is created with Create PTransform,
            # the timestamps are not assigned to it. Adding a Map forces the
            # PCollection to go through a DoFn so that the PCollection consists of
            # the elements with timestamps assigned to them instead of a PCollection
            # of TimestampedValue(element, timestamp).
            pc = p | beam.Create(l) | beam.Map(lambda x: x)
            reified_pc = pc | util.Reify.Timestamp()
            assert_that(reified_pc, equal_to(expected), reify_windows=True)
      
        def test_window(self):
          l = [GlobalWindows.windowed_value('a', 100),
               GlobalWindows.windowed_value('b', 200),
               GlobalWindows.windowed_value('c', 300)]
          expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
                                        [GlobalWindow()]),
                      TestWindowedValue(('b', 200, GlobalWindow()), 200,
                                        [GlobalWindow()]),
                      TestWindowedValue(('c', 300, GlobalWindow()), 300,
                                        [GlobalWindow()])]
          with TestPipeline() as p:
            pc = p | beam.Create(l)
            # Map(lambda x: x) PTransform is added after Create here, because when
            # a PCollection of WindowedValues is created with Create PTransform,
            # the windows are not assigned to it. Adding a Map forces the
            # PCollection to go through a DoFn so that the PCollection consists of
            # the elements with timestamps assigned to them instead of a PCollection
            # of WindowedValue(element, timestamp, window).
            pc = pc | beam.Map(lambda x: x)
            reified_pc = pc | util.Reify.Window()
            assert_that(reified_pc, equal_to(expected), reify_windows=True)
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            pabloem Pablo Estrada
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: