Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.17.0
-
None
-
None
Description
We recently upgraded from Flink 1.14.2 to 1.17.0. Our job has not materially changed beyond a few feature changes (enabling snapshot compression, unaligned checkpoints), but we're seeing the following exception when attempting to adjust the parallelism of our job up or down:
java.lang.RuntimeException: Exception occurred while setting the current key context. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:373) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:508) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:503) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:478) at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36) at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:94) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64) at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272) at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:393) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$1800(AsyncWaitOperator.java:92) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:621) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:602) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:712) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.IllegalArgumentException: Key group 30655 is not in KeyGroupRange{startKeyGroup=19346, endKeyGroup=19360}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation). at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) at org.apache.flink.runtime.state.heap.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:77) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:250) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:371) ... 29 more
We're seeing this in an operator where we make use of DataStreamUtils::reinterpretAsKeyedStream for collecting multiple tasks into a single operator chain. However, each task takes the same data structure as input with an immutable key (represented as a string) which all use the same exact KeySelector instance.
However, one pattern we're using here is a chain of:
KeyedProcessFunction --> RichAsyncFunction --> reinterpretAsKeyedStream(KeyedProcessFunction)
...and I suspect that this might have something to do the way that the buffered in-flight data from the RichAsyncFunction is redistributed during re-scaling. We've observed that this failure is seemingly non-deterministic during re-scaling, but the probability of encountering it (from our admittedly anecdotal and limited testing) is reduced, but not eliminated, when we disable unaligned checkpoints. (Note that we first take a savepoint, restore with unaligned checkpoints disabled, then take another savepoint which we then use to adjust the parallelism to keep "persisted in-flight data" out the savepoint.)
We've never had any issues in the past with this under 1.14, so we're wondering if this is due to unaligned checkpointing, or possibly a regression/change in behavior since then. And if it is due to unaligned checkpointing, any thoughts on why disabling it hasn't seemed to address the problem?
Update: I took a savepoint and confirmed that it could not be restored under a new parallelism. I then removed the RichAsyncFunction and restarted the job (using --allowNonRestoredState) successfully. I was then able to take another savepoint and restart the job with the RichAsyncFunction re-inserted into the DS.