Details
-
Bug
-
Status: Open
-
P2
-
Resolution: Unresolved
-
2.30.0
-
None
-
None
Description
Please see Stack Overflow discussion:
When I create a GCS source & a Pub Source and try to flatten both, there is an error because of some incompatible transformation done by the direct runner.
Code example:
gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \ | 'convert to dict' >> beam.Map(lambda x: json.loads(x)) liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \ | 'convert to dict2' >> beam.Map(lambda x: json.loads(x)) input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()
Error:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run return self.runner.run_pipeline(self, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline return runner.run_pipeline(pipeline, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline pipeline.replace_all(_get_transform_overrides(options)) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all self._check_replacement(override) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement self.visit(ReplacementValidator()) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit self._root_transform().visit(visitor, self, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit part.visit(visitor, pipeline, visited) [Previous line repeated 4 more times] File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit visitor.visit_transform(self) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
The direct runner corrupts the pipeline when it rewrites the transforms.