Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34171

Cannot restore from savepoint when increasing parallelism of operator using reinterpretAsKeyedStream and RichAsyncFunction

    XMLWordPrintableJSON

Details

    Description

      We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially changed beyond a few feature changes (enabling snapshot compression, unaligned checkpoints), but we're seeing the following exception when attempting to adjust the parallelism of our job up or down:

      java.lang.RuntimeException: Exception occurred while setting the current key context.
          at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503)
          at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478)
          at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
          at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75)
          at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
          at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
          at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
          at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
          at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
          at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393)
          at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92)
          at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621)
          at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
          at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
          at java.lang.Thread.run(Thread.java:750)
      Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
          at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
          at org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77)
          at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250)
          at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
          at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371)
          ... 29 more

       
      We're seeing this in an operator where we make use of DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into a single operator chain. However, each task takes the same data structure as input with an immutable key (represented as a string) which all use the same exact KeySelector instance.

      However, one pattern we're using here is a chain of:
      KeyedProcessFunction --> RichAsyncFunction --> reinterpretAsKeyedStream(KeyedProcessFunction)

      ...and I suspect that this might have something to do the way that the buffered in-flight data from the RichAsyncFunction is redistributed during re-scaling. We've observed that this failure is seemingly non-deterministic during re-scaling, but the probability of encountering it (from our admittedly anecdotal and limited testing) is reduced, but not eliminated, when we disable unaligned checkpoints. (Note that we first take a savepoint, restore with unaligned checkpoints disabled, then take another savepoint which we then use to adjust the parallelism to keep "persisted in-flight data" out the savepoint.)

      We've never had any issues in the past with this under 1.14, so we're wondering if this is due to unaligned checkpointing, or possibly a regression/change in behavior since then. And if it is due to unaligned checkpointing, any thoughts on why disabling it hasn't seemed to address the problem?

      Update: I took a savepoint and confirmed that it could not be restored under a new parallelism. I then removed the RichAsyncFunction and restarted the job (using --allowNonRestoredState) successfully. I was then able to take another savepoint and restart the job with the RichAsyncFunction re-inserted into the DS.

      Attachments

        Activity

          People

            Unassigned Unassigned
            burfordk Ken Burford
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: