Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
Description
I'm gettingĀ an error when reading from Kinesis in my pipeline. Using Beam v2.3, running on Google Cloud Dataflow.
I'm constructing the source via:
KinesisIO.Read read = KinesisIO .read() .withAWSClientsProvider( configuration.getAwsAccessKeyId(), configuration.getAwsSecretAccessKey(), region) .withStreamName(configuration.getKinesisStream()) .withUpToDateThreshold(Duration.standardMinutes(30)) .withInitialTimestampInStream(configuration.getStartTime());
The exception is:
Mar 19, 2018 12:54:41 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2018-03-19T19:54:53.010Z: (2896b8774de760ec): java.lang.RuntimeException: Unknown kinesis failure, when trying to reach kinesis org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:223) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150) org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ArithmeticException: Value cannot fit in an int: 153748225435 org.joda.time.field.FieldUtils.safeToInt(FieldUtils.java:206) org.joda.time.field.BaseDurationField.getDifference(BaseDurationField.java:141) org.joda.time.base.BaseSingleFieldPeriod.between(BaseSingleFieldPeriod.java:72) org.joda.time.Minutes.minutesBetween(Minutes.java:101) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getBacklogBytes$3(SimplifiedKinesisClient.java:163) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:205) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:161) org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getBacklogBytes(SimplifiedKinesisClient.java:150) org.apache.beam.sdk.io.kinesis.KinesisReader.getTotalBacklogBytes(KinesisReader.java:200) com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:398) com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1199) com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:137) com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:940) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)