Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34625

TTL doesn't seem to work in pyflink

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.18.1
    • None
    • API / Python
    • None
    • Image used: flink:1.18.1-scala_2.12-java11

    Description

      I've made a simple example to test the ttl and couldn't get the expected results. I went further and replicated this example in Java and it worked just fine. There is an inconsistency in behavior, so there is something wrong in pyflink or my pyflink setup. 

      Here is a code to reproduce. In the example I create a state with ttl 1 second and then process events every 1.5 seconds and print current state.  I expect it to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but instead it prints `None, "state", "state, ...`. In Java it works as expected, prints `Null, Null, ...`

      ```python

      import time

      from pyflink.common import Time, Types
      from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment
      from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor

      class Processor(KeyedProcessFunction):
          def open(self, runtime_context: RuntimeContext):
              state_descriptor = ValueStateDescriptor(
                  name="my_state",
                  value_type_info=Types.STRING(),
              )

              state_descriptor.enable_time_to_live(
                  ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
                  .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
                  .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                  .build()
              )

              self.state = runtime_context.get_state(state_descriptor)

          def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
              # Print current state
              print(self.state.value())
              # expect to print `None` all the time, but prints: `None, 'state', 'state', ...` instead

              # Update state
              self.state.update("state")

              # sleep to reset the state
              time.sleep(1.5)

      if _name_ == "_main_":
          # Init environment
          environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

          # Setup pipeline
          (
              environment.from_collection(
                  collection=list(range(10)),
              )
              .key_by(lambda value: 0)
              .process(Processor())
          )

          # Execute pipeline
          environment.execute("ttl_test")

      ```

       

      ```java

      import org.apache.flink.api.common.state.StateTtlConfig;
      import org.apache.flink.api.common.state.ValueState;
      import org.apache.flink.api.common.state.ValueStateDescriptor;
      import org.apache.flink.api.common.time.Time;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.metrics.Histogram;
      import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
      import org.apache.flink.util.Collector;

      import java.io.IOException;
      import java.time.LocalDateTime;

      public class Processor extends KeyedProcessFunction<Integer, String, String> {

          private transient ValueState<String> state;

          @Override
          public void open(Configuration parameters)

      {         var stateTtlConfig = StateTtlConfig                 .newBuilder(Time.seconds(1))                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)                 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)                 .build();         var stateDescriptor = new ValueStateDescriptor<>("state", String.class);         stateDescriptor.enableTimeToLive(stateTtlConfig);         state = getRuntimeContext().getState(stateDescriptor);     }

          @Override
          public void processElement(String event, Context context, Collector<String> collector) throws IOException, InterruptedException

      {         // print state         var state = state.value();         System.out.println(state); # prints `Null, Null, ...`          // update state         state.update(LocalDateTime.now().toString());         // sleep to reset the state         Thread.sleep(1500);     }

      }

      ```

      Attachments

        Activity

          People

            Unassigned Unassigned
            marklidenberg Mark Lidenberg
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: