Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7496

python sdk fail to read data from HDFS

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.12.0
    • None
    • io-py-hadoop
    • None

    Description

      There was an index bug in hadoopfilesystem.py https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/hadoopfilesystem.py#L72

      The end index should not be included according to the inherited function description: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystemio.py#L51

      This will leads to following Error message during runtime:

      File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 406, in run
          self._options).run(False)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 419, in run
          return self.runner.run_pipeline(self, self._options)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.py", line 132, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 276, in run_pipeline
          default_environment=self._default_environment))
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 280, in run_via_runner_api
          return self.run_stages(*self.create_stages(pipeline_proto))
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 356, in run_stages
          stage_context.safe_coders)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 511, in run_stage
         data_input, data_output)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 1217, in process_bundle
          result_future = self._controller.control_handler.push(process_bundle)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.py", line 832, in push
          response = self.worker.do_instruction(request)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 312, in do_instruction
          request.instruction_id)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 331, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 554, in process_bundle
          ].process_encoded(data.data)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 140, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 245, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 246, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 142, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 396, in apache_beam.runners.worker.operations.ImpulseReadOperation.process
        File "apache_beam/runners/worker/operations.py", line 398, in apache_beam.runners.worker.operations.ImpulseReadOperation.process
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/concat_source.py", line 86, in read
          range_tracker.sub_range_tracker(source_ix)):
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py", line 183, in read_records
          record = _TFRecordUtil.read_record(file_handle)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/tfrecordio.py", line 123, in read_record
          buf = file_handle.read(buf_length_expected)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py", line 261, in read
          self._fetch_to_internal_buffer(num_bytes)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystem.py", line 210, in _fetch_to_internal_buffer
          buf = self._file.read(self._read_size)
        File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystemio.py", line 112, in readinto
          b[:len(data)] = data
      ValueError: cannot modify size of memoryview object
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            qmlmoon Mingliang Qi
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: