Details
-
Bug
-
Status: Resolved
-
Not a Priority
-
Resolution: Fixed
-
1.9.1
-
None
Description
I followed these steps to generate and read states:
- implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
- execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
- implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."
ReaderFunction code as below:
// code placeholder class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] { var countState: ValueState[(Long, Long)] = _ override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)]) countState = getRuntimeContext().getState(stateDescriptor) } override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = { out.collect(countState.value()) } }
Attachments
Issue Links
- links to