Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.1
-
None
-
None
-
Image used: flink:1.18.1-scala_2.12-java11
Description
I've made a simple example to test the ttl and couldn't get the expected results. I went further and replicated this example in Java and it worked just fine. There is an inconsistency in behavior, so there is something wrong in pyflink or my pyflink setup.
Here is a code to reproduce. In the example I create a state with ttl 1 second and then process events every 1.5 seconds and print current state. I expect it to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but instead it prints `None, "state", "state, ...`. In Java it works as expected, prints `Null, Null, ...`
```python
import time
from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
state_descriptor.enable_time_to_live(
ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
# Print current state
print(self.state.value())
# expect to print `None` all the time, but prints: `None, 'state', 'state', ...` instead
# Update state
self.state.update("state")
# sleep to reset the state
time.sleep(1.5)
if _name_ == "_main_":
# Init environment
environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# Setup pipeline
(
environment.from_collection(
collection=list(range(10)),
)
.key_by(lambda value: 0)
.process(Processor())
)
# Execute pipeline
environment.execute("ttl_test")
```
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.time.LocalDateTime;
public class Processor extends KeyedProcessFunction<Integer, String, String> {
private transient ValueState<String> state;
@Override
public void open(Configuration parameters)
@Override
public void processElement(String event, Context context, Collector<String> collector) throws IOException, InterruptedException
}
```