Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12586

Python Direct Runner doesn't support both streaming & non streaming sources

Details

    • Bug
    • Status: Open
    • P2
    • Resolution: Unresolved
    • 2.30.0
    • None
    • runner-direct, sdk-py-core
    • None

    Description

      Please see Stack Overflow discussion:

      https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir

      When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner.

      Code example:

      gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
                        | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
      liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                         | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))
      
      
      input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()
      

      Error:

      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
          return self.runner.run_pipeline(self, self._options)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
          return runner.run_pipeline(pipeline, options)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline
          pipeline.replace_all(_get_transform_overrides(options))
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all
          self._check_replacement(override)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement
          self.visit(ReplacementValidator())
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit
          self._root_transform().visit(visitor, self, visited)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
          part.visit(visitor, pipeline, visited)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
          part.visit(visitor, pipeline, visited)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
          part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit
          visitor.visit_transform(self)
         File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform
          transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
          _GroupByKeyOnly) was not replaced as expected.
      

      The direct runner corrupts the pipeline when it rewrites the transforms.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            rodriguezc Christophe Rodriguez
            Votes:
            3 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: