Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12861

apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT.test_create_catalog_item is flaky

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • None
    • Missing
    • io-py-gcp

    Description

      apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT.test_create_catalog_item

      sample error:

      self = <apache_beam.ml.gcp.recommendations_ai_test_it.RecommendationAIIT testMethod=test_create_catalog_item>
      
          def test_create_catalog_item(self):
          
            CATALOG_ITEM = {
                "id": str(int(random.randrange(100000))),
                "title": "Sample laptop",
                "description": "Indisputably the most fantastic laptop ever created.",
                "language_code": "en",
                "category_hierarchies": [{
                    "categories": ["Electronic", "Computers"]
                }]
            }
          
            with TestPipeline(is_integration_test=True) as p:
              output = (
                  p | 'Create data' >> beam.Create([CATALOG_ITEM])
                  | 'Create CatalogItem' >>
                  recommendations_ai.CreateCatalogItem(project=GCP_TEST_PROJECT)
                  | beam.ParDo(extract_id) | beam.combiners.ToList())
          
      >       assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
      
      apache_beam/ml/gcp/recommendations_ai_test_it.py:80: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      apache_beam/pipeline.py:586: in __exit__
          self.result = self.run()
      apache_beam/testing/test_pipeline.py:114: in run
          False if self.not_use_test_runner_api else test_runner_api))
      apache_beam/pipeline.py:541: in run
          self._options).run(False)
      apache_beam/pipeline.py:565: in run
          return self.runner.run_pipeline(self, self._options)
      apache_beam/runners/dataflow/test_dataflow_runner.py:65: in run_pipeline
          self.result.wait_until_finish(duration=wait_duration)
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <DataflowPipelineResult <Job
       createTime: '2021-08-25T07:09:43.529173Z'
       currentState: CurrentStateValueValuesEnum(JOB...021-08-25T07:09:43.529173Z'
       steps: []
       tempFiles: []
       type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f74c484b6d8>
      duration = None
      
          def wait_until_finish(self, duration=None):
            if not self.is_in_terminal_state():
              if not self.has_job:
                raise IOError('Failed to get the Dataflow job id.')
          
              thread = threading.Thread(
                  target=DataflowRunner.poll_for_job_completion,
                  args=(self._runner, self, duration))
          
              # Mark the thread as a daemon thread so a keyboard interrupt on the main
              # thread will terminate everything. This is also the reason we will not
              # use thread.join() to wait for the polling thread.
              thread.daemon = True
              thread.start()
              while thread.is_alive():
                time.sleep(5.0)
          
              # TODO: Merge the termination code in poll_for_job_completion and
              # is_in_terminal_state.
              terminated = self.is_in_terminal_state()
              assert duration or terminated, (
                  'Job did not reach to a terminal state after waiting indefinitely.')
          
              if terminated and self.state != PipelineState.DONE:
                # TODO(BEAM-1290): Consider converting this to an error log based on
                # theresolution of the issue.
                raise DataflowRuntimeException(
                    'Dataflow pipeline failed. State: %s, Error:\n%s' %
                    (self.state, getattr(self._runner, 'last_error_msg', None)),
      >             self)
      E         apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
      E         Traceback (most recent call last):
      E           File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
      E           File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
      E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/transforms/core.py", line 1560, in <lambda>
      E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
      E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/testing/util.py", line 190, in _equal
      E             raise BeamAssertException(msg)
      E         apache_beam.testing.util.BeamAssertException: Failed assert: [['4709']] == [[]], unexpected elements [[]], missing elements [['4709']]
      E         
      E         During handling of the above exception, another exception occurred:
      E         
      E         Traceback (most recent call last):
      E           File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
      E             work_executor.execute()
      E           File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
      E             op.start()
      E           File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      E           File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      E           File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      E           File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      E           File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      E           File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
      E           File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      E           File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      E           File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      E           File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
      E           File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      E           File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      E           File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
      E           File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
      E           File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      E           File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      E           File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
      E           File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
      E           File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      E           File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      E           File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
      E           File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
      E           File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      E           File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
      E           File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      E           File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
      E           File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
      E           File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
      E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/transforms/core.py", line 1560, in <lambda>
      E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
      E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python37/src/sdks/python/apache_beam/testing/util.py", line 190, in _equal
      E             raise BeamAssertException(msg)
      E         apache_beam.testing.util.BeamAssertException: Failed assert: [['4709']] == [[]], unexpected elements [[]], missing elements [['4709']] [while running 'assert_that/Match']
      
      apache_beam/runners/dataflow/dataflow_runner.py:1635: DataflowRuntimeException
      
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tvalentyn Valentyn Tymofieiev
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: