Details
-
New Feature
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Currently the only way for python sdks to instruct flink to use a StateBackend different than the default (MemoryStateBackend) would be to specify state.backend in flink-conf.yaml, which creates the limitation of using the same statebackend for every job running on the same flink cluster. Ideally we should be able to pass it in to flink runner through PipelineOptions. Here's the error it spits out when I flag --state_backend=RocksDBStateBackend:
RuntimeError: Pipeline failed in state FAILED: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.apache.flink.runtime.state.StateBackend` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]
Acceptance Criteria:
Flink StateBackend is configurable via command line options from python.