Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33404

on_timer method is missing in ProcessFunction and CoProcessFunction of Pyflink

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Not a Priority
    • Resolution: Unresolved
    • None
    • None
    • API / Python
    • None

    Description

      Hello,

      I find the `on_timer` method is not found in ProcessFunction and CoProcessFunction of Pyflink and it causes an error when I register a timer eg)

       ```
        ...
        File "/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py", line 101, in process_timer
          yield from _emit_results(
        File "/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py", line 131, in _emit_results
          for result in results:
        File "/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/input_handler.py", line 114, in _on_processing_time
          yield from self._on_processing_time_func(timestamp, key, namespace)
        File "/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 308, in on_processing_time
          return _on_timer(TimeDomain.PROCESSING_TIME, timestamp, key)
        File "/home/jaehyeon/personal/flink-demos/venv/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 317, in _on_timer
          return process_function.on_timer(timestamp, on_timer_ctx)
      AttributeError: 'ReadingFilter' object has no attribute 'on_timer'

              at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
              at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
              at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
              ... 3 more
      ```

      I'm working on Pyflink 1.17.1 but it would be applicable other versions.

      Can the method be added to the functions?

      Cheers,
      Jaehyeon

      Attachments

        Activity

          People

            Unassigned Unassigned
            dottami Jaehyeon Kim
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: