Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3726

Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move backwards

Details

    • Bug
    • Status: Triage Needed
    • P2
    • Resolution: Fixed
    • 2.2.0
    • 2.5.0
    • io-java-kinesis
    • None

    Description

      When the job is restored from savepoint Kinesis Reader throws almost always java.lang.IllegalArgumentException: Attempting to move backwards

      After a few job restarts caused again by the same exception, job finally starts up and continues to run with no further problems.

      Beam job is reading from 32 shards with parallelism set to 32. Using Flink 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis client was refactored to use MovingFunction. I think this is a serious regression bug introduced in Beam 2.2. 

       

      java.lang.IllegalArgumentException: Attempting to move backwards
      at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
      at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
      at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
      at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
      at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
      at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
      at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
      at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

       

      Kinesis Reader transformation configuration:

      pipeline.apply("KINESIS READER", KinesisIO.read()
      .withStreamName(streamName)
      .withInitialPositionInStream(InitialPositionInStream.LATEST)
      .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1))

       

      When testing locally I managed to catch this exception. Just before executing this link that threw exception I captured the state of the class so that you can replicate the issue

      org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]

       

      the add function of MovingFunction was called with nowMsSinceEpoch = 1519315583591

       

       

      Attachments

        1. KinesisIO-state.txt
          3 kB
          Pawel Bartoszek

        Issue Links

          Activity

            People

              aromanenko Alexey Romanenko
              pawelbartoszek Pawel Bartoszek
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: