Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Workaround
-
1.12.0
-
None
Description
When a savepoint is triggered for a regular Flink Job with a keyed function, the key is serialized with
flink.api.common.typeutils.base.IntSerializer
and the value serialized with
flink.api.scala.typeutils.ScalaCaseClassSerializerSnapshot
When the savepoint is loaded with the state processor api, transformed, and rewritten to the disk, the serializer taken is different.
Key:
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot
Now when the savepoint written by the states processor api is loaded, there is the exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) must be compatible with the previous key serializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30).Caused by: org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.api.common.typeutils.base.IntSerializer@11a7ba62) must be compatible with the previous key serializer (org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@3b56cc30). at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:147) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more
State processor api should use the same serializer of Flink since the type is exactly the same.
I have attached a zip that contains the code to test it.
In the project zipped there are the source and rewritten savepoints.
Note:
I have tried to play with enableX/disableX serializer, but so far no success.