Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Duplicate
-
None
-
None
Description
Code to recreate (modified slightly from https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E):
import apache_beam as beam
import argparse
from apache_beam import transforms
from apache_beam import pvalue
from apache_beam.options import pipeline_options
import logging
def _copy_number(number, side=None):
print '_copy_number:', number, side
yield number
def fn_sum(values):
#print 'values', values
return sum(values)
def run(argv=None):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
options = pipeline_options.PipelineOptions(pipeline_args)
#options.view_as(pipeline_options.StandardOptions).streaming = True
numbers = [1, 2]
with beam.Pipeline(options=options) as p:
sum_1 = (p
'ReadNumber1' >> transforms.Create(numbers) |
'CalculateSum1' >> beam.CombineGlobally(fn_sum)) |
sum_2 = (p
'ReadNumber2' >> transforms.Create(numbers) |
beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) |
'CalculateSum2' >> beam.CombineGlobally(fn_sum)) |
_ = ((sum_1, sum_2)
beam.Flatten() |
'CalculateSum3' >> beam.CombineGlobally(fn_sum) |
beam.io.WriteToText('gs://foo/sum')) |
logging.getLogger().setLevel(logging.INFO)
run()
Console:
$ python test.py
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at 0x7f2b7088e758> ====================
INFO:root:==================== <function fix_side_input_pcoll_coders at 0x7f2b7088e7d0> ====================
INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0> ====================
INFO:root:==================== <function expand_gbk at 0x7f2b7088e668> ====================
INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0> ====================
INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848> ====================
INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578> ====================
INFO:root:==================== <function inject_timer_pcollections at 0x7f2b7088e8c0> ====================
INFO:root:==================== <function sort_stages at 0x7f2b7088e938> ====================
INFO:root:Running ((ref_AppliedPTransform_ReadNumber1/Read_3)((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write)
INFO:root:Running ((CalculateSum1/CombinePerKey/Group/Read)(CalculateSum1/CombinePerKey/Merge))((CalculateSum1/CombinePerKey/ExtractOutputs)((ref_AppliedPTransform_CalculateSum1/UnKey_13)(ref_PCollection_PCollection_7/Write)))
INFO:root:Running ((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)(ref_PCollection_PCollection_9/Write))(Flatten/Transcode/0)))(Flatten/Write/0)
INFO:root:Running ((ref_AppliedPTransform_ReadNumber2/Read_18)((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)(CalculateSum2/CombinePerKey/Precombine))))(CalculateSum2/CombinePerKey/Group/Write)
Traceback (most recent call last):
File "test.py", line 41, in <module>
run()
File "test.py", line 38, in run
beam.io.WriteToText('gs://foo/sum')) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 425, in _exit_ self.run().wait_until_finish() File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 405, in run self._options).run(False) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 418, in run return self.runner.run_pipeline(self) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 139, in run_pipeline return runner.run_pipeline(pipeline) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 238, in run_pipeline return self.run_via_runner_api(pipeline.to_runner_api()) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 241, in run_via_runner_api return self.run_stages(*self.create_stages(pipeline_proto)) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1020, in run_stages pcoll_buffers, safe_coders) File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1096, in run_stage pipeline_components.pcollections[actual_pcoll_id].coder_id]] KeyError: u'coder_4' |
Attachments
Issue Links
- is duplicated by
-
BEAM-6404 FnAPI translation error
- Resolved