Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6179

Batch size estimation failing

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.10.0
    • None

    Description

      Batch size estimation is failing on flink when running 13MB input pipeline with error
      ValueError: On entry to DLASCL parameter number 4 had an illegal value
      java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 48: Traceback (most recent call last):
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
      response = task()
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
      self._execute(lambda: worker.do_instruction(work), work)
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
      request.instruction_id)
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
      bundle_processor.process_bundle(instruction_id)
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 480, in process_bundle
      ].process_encoded(data.data)
      File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 125, in process_encoded
      self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 182, in apache_beam.runners.worker.operations.Operation.output
      def output(self, windowed_value, output_index=0):
      File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
      cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 709, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 420, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 794, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 709, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 420, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 794, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 709, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 420, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 794, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 709, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 420, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 794, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise_with_traceback(new_exn)
      File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 420, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
      for result in results:
      File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 398, in process
      self._batch_size = self._batch_size_estimator.next_batch_size()
      File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 351, in next_batch_size
      a, b = self.linear_regression(xs, ys)
      File "/usr/local/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 321, in linear_regression_numpy
      b, a = np.polyfit(xs, ys, 1, w=weight)
      File "/usr/local/lib/python2.7/site-packages/numpy/lib/polynomial.py", line 585, in polyfit
      c, resids, rank, s = lstsq(lhs, rhs, rcond)
      File "/usr/local/lib/python2.7/site-packages/numpy/linalg/linalg.py", line 1957, in lstsq
      0, work, lwork, iwork, 0)
      ValueError: On entry to DLASCL parameter number 4 had an illegal value [while running 'Analyze/RunPhase[0]/BatchAnalyzerInputs/BatchElements/ParDo(_GlobalWindowsBatchingDoFn)']

      at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
      at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
      at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:263)
      at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.$closeResource(FlinkExecutableStageFunction.java:188)
      at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:188)
      at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
      at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
      at java.lang.Thread.run(Thread.java:748)
       

      Attachments

        Issue Links

          Activity

            People

              angoenka Ankur Goenka
              angoenka Ankur Goenka
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 40m
                  2h 40m