Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12860

apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT.test_aggregation is flaky

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Cannot Reproduce
    • None
    • Missing
    • sdk-py-core

    Description

      Sample error from:
      https://ci-beam.apache.org/job/beam_PostCommit_Python37/4245/

      Error Message
      apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Workflow failed.
      Stacktrace
      self = <apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT testMethod=test_aggregation>
      
          @pytest.mark.it_postcommit
          def test_aggregation(self):
            taxiride.run_aggregation_pipeline(
                self.test_pipeline,
                'gs://apache-beam-samples/nyc_taxi/2018/*.csv',
      >         self.output_path)
      
      apache_beam/examples/dataframe/taxiride_it_test.py:52: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      apache_beam/examples/dataframe/taxiride.py:42: in run_aggregation_pipeline
          agg.to_csv(output_path)
      apache_beam/pipeline.py:586: in __exit__
          self.result = self.run()
      apache_beam/testing/test_pipeline.py:114: in run
          False if self.not_use_test_runner_api else test_runner_api))
      apache_beam/pipeline.py:541: in run
          self._options).run(False)
      apache_beam/pipeline.py:565: in run
          return self.runner.run_pipeline(self, self._options)
      apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
          self.result.wait_until_finish(duration=wait_duration)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <DataflowPipelineResult <Job
       clientRequestId: '20210908003059981848-7794'
       createTime: '2021-09-08T00:31:08.694724Z'
      ...021-09-08T00:31:08.694724Z'
       steps: []
       tempFiles: []
       type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f3f4bcf06d8>
      duration = None
      
          def wait_until_finish(self, duration=None):
            if not self.is_in_terminal_state():
              if not self.has_job:
                raise IOError('Failed to get the Dataflow job id.')
          
              thread = threading.Thread(
                  target=DataflowRunner.poll_for_job_completion,
                  args=(self._runner, self, duration))
          
              # Mark the thread as a daemon thread so a keyboard interrupt on the main
              # thread will terminate everything. This is also the reason we will not
              # use thread.join() to wait for the polling thread.
              thread.daemon = True
              thread.start()
              while thread.is_alive():
                time.sleep(5.0)
          
              # TODO: Merge the termination code in poll_for_job_completion and
              # is_in_terminal_state.
              terminated = self.is_in_terminal_state()
              assert duration or terminated, (
                  'Job did not reach to a terminal state after waiting indefinitely.')
          
              if terminated and self.state != PipelineState.DONE:
                # TODO(BEAM-1290): Consider converting this to an error log based on
                # theresolution of the issue.
                raise DataflowRuntimeException(
                    'Dataflow pipeline failed. State: %s, Error:\n%s' %
                    (self.state, getattr(self._runner, 'last_error_msg', None)),
      >             self)
      E         apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
      E         Workflow failed.
      
      apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException
      

      Attachments

        Activity

          People

            bhulette Brian Hulette
            tvalentyn Valentyn Tymofieiev
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment