Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11807

SDK Worker with multithreading causes boto3 the KeyError(endpoint_resolver)

Details

    Description

      https://github.com/boto/botocore/issues/1776

      Traceback (most recent call last):
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
          response = task()
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
          return getattr(self, request_type)(
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
          input_op_by_transform_id[element.transform_id].process_encoded(
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded
          self.output(decoded_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 359, in output
          cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 221, in receive
          self.consumer.process(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 719, in process
          delayed_application = self.dofn_runner.process(o)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in process
          self._reraise_augmented(exn)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in process
          return self.do_fn_invoker.invoke_process(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in invoke_process
          self.output_processor.process_outputs(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1401, in process_outputs
          self.main_receivers.receive(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 221, in receive
          self.consumer.process(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 719, in process
          delayed_application = self.dofn_runner.process(o)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in process
          self._reraise_augmented(exn)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1321, in _reraise_augmented
          raise_with_traceback(new_exn)
        File "/home/local/.local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
          raise exc.with_traceback(traceback)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in process
          return self.do_fn_invoker.invoke_process(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in invoke_process
          self.output_processor.process_outputs(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in process_outputs
          for result in results:
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
          initial_restriction = self.restriction_provider.initial_restriction(
        File "/tmp/beam/sdks/python/apache_beam/io/iobase.py", line 1545, in initial_restriction
          range_tracker = element_source.get_range_tracker(None, None)
        File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker
          return self._get_concat_source().get_range_tracker(
        File "/tmp/beam/sdks/python/apache_beam/options/value_provider.py", line 200, in _f
          return fnc(self, *args, **kwargs)
        File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source
          match_result = FileSystems.match([pattern])[0]
        File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 209, in match
          return filesystem.match(patterns, limits)
        File "/tmp/beam/sdks/python/apache_beam/io/filesystem.py", line 765, in match
          raise BeamIOError("Match operation failed", exceptions)
      apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/*': BeamIOError("List operation failed with exceptions {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/': KeyError('endpoint_resolver')}")} [while running 'TFXIOReadAndDecode[TransformIndex1]/RawRecordBeamSource/ReadRawRecords/ReadFromTFRecord[0]/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction0'] with exceptions None
      Traceback (most recent call last):
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
          response = task()
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
          return getattr(self, request_type)(
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
          input_op_by_transform_id[element.transform_id].process_encoded(
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded
          self.output(decoded_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 359, in output
          cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 221, in receive
          self.consumer.process(windowed_value)
        File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 838, in process
          delayed_applications = self.dofn_runner.process_with_sized_restriction(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1247, in process_with_sized_restriction
          return self.do_fn_invoker.invoke_process(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 748, in invoke_process
          residual = self._invoke_process_per_window(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 886, in _invoke_process_per_window
          self.output_processor.process_outputs(
        File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in process_outputs
          for result in results:
        File "/tmp/beam/sdks/python/apache_beam/io/tfrecordio.py", line 187, in read_records
          with self.open_file(file_name) as file_handle:
        File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 177, in open_file
          return FileSystems.open(
        File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 249, in open
          return filesystem.open(path, mime_type, compression_type)
        File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 190, in open
          return self._path_open(path, 'rb', mime_type, compression_type)
        File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 154, in _path_open
          raw_file = s3io.S3IO(options=self._options).open(
        File "/tmp/beam/sdks/python/apache_beam/io/aws/s3io.py", line 67, in __init__
          self.client = boto3_client.Client(options=options)
        File "/tmp/beam/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py", line 60, in __init__
          self.client = boto3.client(
        File "/home/local/.local/lib/python3.8/site-packages/boto3/__init__.py", line 93, in client
          return _get_default_session().client(*args, **kwargs)
        File "/home/local/.local/lib/python3.8/site-packages/boto3/session.py", line 258, in client
          return self._session.create_client(
        File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", line 827, in create_client
          endpoint_resolver = self._get_internal_component('endpoint_resolver')
        File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", line 700, in _get_internal_component
          return self._internal_components.get_component(name)
        File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", line 928, in get_component
          del self._deferred[name]
      KeyError: 'endpoint_resolver'

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ferryvg ferryvg
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 50m
                  1h 50m