Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5927

FnApiRunner KeyError

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • None
    • Not applicable
    • sdk-py-core
    • None

    Description

      Code to recreate (modified slightly from https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E):

      import apache_beam as beam
      import argparse
      from apache_beam import transforms
      from apache_beam import pvalue
      from apache_beam.options import pipeline_options
      import logging

      def _copy_number(number, side=None):
      print '_copy_number:', number, side
      yield number

      def fn_sum(values):
      #print 'values', values
      return sum(values)

      def run(argv=None):
      parser = argparse.ArgumentParser()
      _, pipeline_args = parser.parse_known_args(argv)
      options = pipeline_options.PipelineOptions(pipeline_args)
      #options.view_as(pipeline_options.StandardOptions).streaming = True
      numbers = [1, 2]
      with beam.Pipeline(options=options) as p:
      sum_1 = (p

      'ReadNumber1' >> transforms.Create(numbers)
      'CalculateSum1' >> beam.CombineGlobally(fn_sum))

      sum_2 = (p

      'ReadNumber2' >> transforms.Create(numbers)
      beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
      'CalculateSum2' >> beam.CombineGlobally(fn_sum))

      _ = ((sum_1, sum_2)

      beam.Flatten()
      'CalculateSum3' >> beam.CombineGlobally(fn_sum)
      beam.io.WriteToText('gs://foo/sum'))

      logging.getLogger().setLevel(logging.INFO)
      run()

      Console:
      $ python test.py
      INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
      INFO:root:==================== <function annotate_downstream_side_inputs at 0x7f2b7088e758> ====================
      INFO:root:==================== <function fix_side_input_pcoll_coders at 0x7f2b7088e7d0> ====================
      INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0> ====================
      INFO:root:==================== <function expand_gbk at 0x7f2b7088e668> ====================
      INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0> ====================
      INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848> ====================
      INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578> ====================
      INFO:root:==================== <function inject_timer_pcollections at 0x7f2b7088e8c0> ====================
      INFO:root:==================== <function sort_stages at 0x7f2b7088e938> ====================
      INFO:root:Running ((ref_AppliedPTransform_ReadNumber1/Read_3)((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write)
      INFO:root:Running ((CalculateSum1/CombinePerKey/Group/Read)(CalculateSum1/CombinePerKey/Merge))((CalculateSum1/CombinePerKey/ExtractOutputs)((ref_AppliedPTransform_CalculateSum1/UnKey_13)(ref_PCollection_PCollection_7/Write)))
      INFO:root:Running ((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)(ref_PCollection_PCollection_9/Write))(Flatten/Transcode/0)))(Flatten/Write/0)
      INFO:root:Running ((ref_AppliedPTransform_ReadNumber2/Read_18)((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)(CalculateSum2/CombinePerKey/Precombine))))(CalculateSum2/CombinePerKey/Group/Write)
      Traceback (most recent call last):
      File "test.py", line 41, in <module>
      run()
      File "test.py", line 38, in run

      beam.io.WriteToText('gs://foo/sum'))
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 425, in _exit_
      self.run().wait_until_finish()
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 405, in run
      self._options).run(False)
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 418, in run
      return self.runner.run_pipeline(self)
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 139, in run_pipeline
      return runner.run_pipeline(pipeline)
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 238, in run_pipeline
      return self.run_via_runner_api(pipeline.to_runner_api())
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 241, in run_via_runner_api
      return self.run_stages(*self.create_stages(pipeline_proto))
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1020, in run_stages
      pcoll_buffers, safe_coders)
      File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1096, in run_stage
      pipeline_components.pcollections[actual_pcoll_id].coder_id]]
      KeyError: u'coder_4'

      Attachments

        Issue Links

          Activity

            People

              robertwb Robert Bradshaw
              udim Udi Meiri
              Votes:
              2 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: