Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-14176

Beam dataflow hangs with requirements.txt

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.38.0
    • sdk-py-core
    • None

    Description

      Similar to this question:

      https://stackoverflow.com/questions/62032382/dataflow-fails-when-i-add-requirements-txt-python

      Note: I could resolve this also by using setup.py.  However, it would be nice to have a better error message instead of hanging.

       

      When trying to use a requirements.txt file and deploy to dataflow, beam is hanging.

      Here was the following last message.

      INFO:apache_beam.runners.portability.stager:Executing command: 

      ['/Users/ryanthompson/.virtualenvs/hackathon/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/6j/0z_b3j512gd6_mszhyy5p5qc0037d6/T/dataflow-requirements-cache', '-r', '/var/folders/6j/0z_b3j512gd6_mszhyy5p5qc0037d6/T/tmp68jk51_9/tmp_requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']

      Here is a program that replicates:

      import logging

      import argparse
      import apache_beam as beam
      from apache_beam import Create
      from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
      import apache_beam.io.gcp.gcsfilesystem as gcsfs

      import py_midicsv as pm

      def midi_to_csv(file_name) -> str:
      fs = gcsfs.GCSFileSystem(PipelineOptions())
      file = fs.open(file_name, 'rb')
      return pm.midi_to_csv(file)

      def run(argv=None):
      parser = argparse.ArgumentParser()
      known_args, pipeline_args = parser.parse_known_args(argv)

      1. For gs testing.
        input_filenames = ['gs://clouddfe-ryanthompson/hackathon/classical/bach/bach_846.mid']
        output_name = 'gs://clouddfe-ryanthompson/hackathon/output/midi_out'

      options = PipelineOptions(pipeline_args)
      options.view_as(SetupOptions).save_main_session = True
      options.view_as(SetupOptions).requirements_file = 'pipelines/requirements.txt'
      with beam.Pipeline(options=options) as p:
      (p | Create(input_filenames)
      mapped = input_pcol | 'Read File from GCS' >> beam.Map(midi_to_csv)
      written = mapped | 'Write to output files' >> beam.Map(logging.info))

      if _name_ == '_main_':
      logging.getLogger().setLevel(logging.INFO)
      run()

       

      Here is my requirements.txt file:

      py-midicsv

       

      Other possibly relevant information. 

      I tested with python 3.6, on macbook, with pycharm console

      Attachments

        Issue Links

          Activity

            People

              Anand Inguva Anand Inguva
              Ryan.Thompson Ryan Thompson
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: