Details
-
Bug
-
Status: Triage Needed
-
P2
-
Resolution: Fixed
-
2.2.0
-
None
Description
When the job is restored from savepoint Kinesis Reader throws almost always java.lang.IllegalArgumentException: Attempting to move backwards
After a few job restarts caused again by the same exception, job finally starts up and continues to run with no further problems.
Beam job is reading from 32 shards with parallelism set to 32. Using Flink 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis client was refactored to use MovingFunction. I think this is a serious regression bug introduced in Beam 2.2.
java.lang.IllegalArgumentException: Attempting to move backwards at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97) at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114) at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137) at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
Kinesis Reader transformation configuration:
pipeline.apply("KINESIS READER", KinesisIO.read()
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1))
When testing locally I managed to catch this exception. Just before executing this link that threw exception I captured the state of the class so that you can replicate the issue
org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]
the add function of MovingFunction was called with nowMsSinceEpoch = 1519315583591
Attachments
Attachments
Issue Links
- is related to
-
BEAM-3087 Extend lock scope in Flink UnboundedSourceWrapper
- Resolved