Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Cannot Reproduce
-
None
Description
Sample error from:
https://ci-beam.apache.org/job/beam_PostCommit_Python37/4245/
Error Message apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Workflow failed. Stacktrace self = <apache_beam.examples.dataframe.taxiride_it_test.TaxirideIT testMethod=test_aggregation> @pytest.mark.it_postcommit def test_aggregation(self): taxiride.run_aggregation_pipeline( self.test_pipeline, 'gs://apache-beam-samples/nyc_taxi/2018/*.csv', > self.output_path) apache_beam/examples/dataframe/taxiride_it_test.py:52: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/examples/dataframe/taxiride.py:42: in run_aggregation_pipeline agg.to_csv(output_path) apache_beam/pipeline.py:586: in __exit__ self.result = self.run() apache_beam/testing/test_pipeline.py:114: in run False if self.not_use_test_runner_api else test_runner_api)) apache_beam/pipeline.py:541: in run self._options).run(False) apache_beam/pipeline.py:565: in run return self.runner.run_pipeline(self, self._options) apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline self.result.wait_until_finish(duration=wait_duration) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <DataflowPipelineResult <Job clientRequestId: '20210908003059981848-7794' createTime: '2021-09-08T00:31:08.694724Z' ...021-09-08T00:31:08.694724Z' steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f3f4bcf06d8> duration = None def wait_until_finish(self, duration=None): if not self.is_in_terminal_state(): if not self.has_job: raise IOError('Failed to get the Dataflow job id.') thread = threading.Thread( target=DataflowRunner.poll_for_job_completion, args=(self._runner, self, duration)) # Mark the thread as a daemon thread so a keyboard interrupt on the main # thread will terminate everything. This is also the reason we will not # use thread.join() to wait for the polling thread. thread.daemon = True thread.start() while thread.is_alive(): time.sleep(5.0) # TODO: Merge the termination code in poll_for_job_completion and # is_in_terminal_state. terminated = self.is_in_terminal_state() assert duration or terminated, ( 'Job did not reach to a terminal state after waiting indefinitely.') if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. raise DataflowRuntimeException( 'Dataflow pipeline failed. State: %s, Error:\n%s' % (self.state, getattr(self._runner, 'last_error_msg', None)), > self) E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: E Workflow failed. apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException