Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8441

Python 3 pipeline fails with errors in StockUnpickler.find_class() during loading a main session.

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None

    Description

      When running Apache Beam with Python3 on Google Cloud Dataflow the pipeline fails during pickler.load_session(session_file):
      StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'SomeAttribute' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>

      Note that this is different from BEAM-8651, since the error happens in a Batch Pipeline on a Dataflow runner and the error happens consistently.

      When testing it in the local/direct runner there seems to be no issue.
       

      class FlattenCustomActions(beam.PTransform):
          """ Transforms Facebook Day Actions        Only retains actions with custom_conversions
              Flattens the actions
              Adds custom conversions names using a side input
          """    
          def __init__(self, conversions):
              super(FlattenCustomActions, self).__init__()
              self.conversions = conversions    def expand(self, input_or_inputs):
              return (
                  input_or_inputs
                  | "FlattenActions" >> beam.ParDo(flatten_filter_actions)
                  | "AddConversionName" >> beam.Map(add_conversion_name, self.conversions)
              )
      
      # ...
      # in run():
          pipeline_options = PipelineOptions(pipeline_args)
          pipeline_options.view_as(SetupOptions).save_main_session = True
          p = beam.Pipeline(options=pipeline_options)
          conversions_output = (
              p
              | "ReadConversions" >> ReadFromText(known_args.input_conversions, coder=JsonCoder())
              | TransformConversionMetadata()
          )    (
              conversions_output
              | "WriteConversions"
              >> WriteCoerced(
                  known_args.output_conversions,
                  known_args.output_type,
                  schema_path=BIGQUERY_SCHEMA_CONVERSIONS_PATH,
              )
          )    (
              p
              | ReadFacebookJson(known_args.input, retain_root_fields=True)
              | FlattenCustomActions(beam.pvalue.AsList(conversions_output))
              | "WriteActions"
              >> WriteCoerced(
                  known_args.output, known_args.output_type, schema_path=BIGQUERY_SCHEMA_ACTIONS_PATH
              )
          )

       

      I receive the following Traceback in Dataflow:

      Traceback (most recent call last): 
        File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) 
        File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session pickler.load_session(session_file) 
        File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 287, in load_session return dill.load_session(file_path) 
        File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 410, in load_session module = unpickler.load() 
        File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 474, in find_class return StockUnpickler.find_class(self, module, name) AttributeError: Can't get attribute 'FlattenCustomActions' on <module 'dataflow_worker.start' from '/usr/local/lib/python3.6/site-packages/dataflow_worker/start.py'>
      

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Jannik.Franz@umusic.com Jannik Franz
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: