Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9029

Two bugs in Python SDK S3 filesystem support

Details

    • Patch

    Description

      Hi

      There seem to be 2 bugs in the S3 filesystem support.

      I tried to use S3 storage for a simple wordcount demo with DirectRunner.

      The demo script:

      def main():
      options = PipelineOptions().view_as(StandardOptions)
       options.runner = 'DirectRunner'
      pipeline = beam.Pipeline(options = options)
      (
       pipeline
       | ReadFromText("s3://mx-machine-learning/panwenhai/beam_test/test_data")
       | "extract_words" >> beam.FlatMap(lambda x: re.findall(r" [A-Za-z\']+", x))
       | beam.combiners.Count.PerElement()
       | beam.MapTuple(lambda word, count: "%s: %s" % (word, count))
       | WriteToText("s3://mx-machine-learning/panwenhai/beam_test/output")
       )
      result = pipeline.run()
       result.wait_until_finish()
      return
      

       

      Error message 1:

      apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions {'s3://mx-machine-learning/panwenhai/beam_test/output-*-of-00001': BeamIOError("List operation failed with exceptions {'s3://mx-machine-learning/panwenhai/beam_test/output-': S3ClientError('Tried to list nonexistent S3 path: s3://mx-machine-learning/panwenhai/beam_test/output-', 404)}")} [while running 'WriteToText/Write/WriteImpl/PreFinalize'] with exceptions None

       

      After digging into the code, it seems the Boto3 client's list function will raise an exception when trying to list a nonexistent S3 path (beam/sdks/pythonapache_beam/io/aws/clients/s3/boto3_client.py line 111). And the S3IO class does not handle this exception in list_prefix function (beam/sdks/python/apache_beam/io/aws/s3io.py line 121).

      When the runner tries to list and delete the existing output file, if there is no existing output file, it will try to list a nonexistent S3 path and will trigger the exception.

      This should not be an issue here. I think we can ignore this exception safely in the S3IO list_prefix function.

      Error Message 2:

      File "/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py", line 272, in delete
      exceptions = {path: error for (path, error) in results
      File "/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py", line 272, in <dictcomp>
      exceptions = {path: error for (path, error) in results
      ValueError: too many values to unpack (expected 2) [while running 'WriteToText/Write/WriteImpl/FinalizeWrite']

       

      When the runner tries to delete the temporary output directory, it will trigger this exception. This exception is caused by parsing (path, error) directly from the "results" which is a dict (beam/sdks/python/apache_beam/io/aws/s3filesystem.py line 272). I think we should use results.items() here.

      I have submitted a patch for these 2 bugs: https://github.com/apache/beam/pull/10459

       

      Thank you.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              icemoon1987 Wenhai Pan
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Time Spent - 3h Remaining Estimate - 21h
                  21h
                  Logged:
                  Time Spent - 3h Remaining Estimate - 21h
                  3h