Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33188

PyFlink MapState with Types.ROW() throws exception

    XMLWordPrintableJSON

Details

    Description

      I'm trying to use MapState, where the value will be a list of <class 'pyflink.common.types.Row'> type elements.
       
      Wanted to check if anyone else faced the same issue while trying to use MapState in PyFlink with complex types.
       
      Here is the code:
       
      from pyflink.common import Time
      from pyflink.common.typeinfo import Types
      from pyflink.datastream import StreamExecutionEnvironment
      from pyflink.datastream.functions import (
          KeyedCoProcessFunction,
          KeySelector,
          RuntimeContext,
      )
      from pyflink.datastream.state import (
          MapStateDescriptor,
          StateTtlConfig,
          ValueStateDescriptor,
          ListStateDescriptor
      )
      from pyflink.table import DataTypes, StreamTableEnvironment

      class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
          def _init_(self):
              self.my_map_state = None

          def open(self, runtime_context: RuntimeContext):
              state_ttl_config = (
                  StateTtlConfig.new_builder(Time.seconds(1))
                  .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
                  .disable_cleanup_in_background()
                  .build()
              )

              my_map_state_descriptor = MapStateDescriptor(
                  "my_map_state",
                  Types.SQL_TIMESTAMP(),
                  Types.LIST(Types.ROW([
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.SQL_TIMESTAMP(),
                      Types.SQL_TIMESTAMP(),
                      Types.SQL_TIMESTAMP(),
                      Types.BIG_INT()
                  ]))
              )
              my_map_state_descriptor.enable_time_to_live(state_ttl_config)
              self.my_map_state = runtime_context.get_map_state(my_map_state_descriptor)
       
      But while running this code, it fails with this exception at job startup (at runtime_context.get_map_state(my_map_state_descriptor)), even without trying to add anything to the state.
       
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation._init_
      File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation._init_
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 127, in open
      self.open_func()
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 296, in open_func
      process_function.open(runtime_context)
      File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", line 125, in get_map_state
      map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 812, in from_type_info
      from_type_info(type_info._key_type_info), from_type_info(type_info._value_type_info))
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 809, in from_type_info
      returnGenericArrayCoder(from_type_info(type_info.elem_type))
      File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 819, in from_type_info
      [f for f in type_info.get_field_names()])
      File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 377, in get_field_names
      j_field_names = self.get_java_type_info().getFieldNames()
      File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 391, in get_java_type_info
      j_types_array = get_gateway()\
      File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 62, in get_gateway
      _gateway = launch_gateway()
      File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 86, in launch_gateway
      raise Exception("It's launching the PythonGatewayServer during Python UDF execution "
      Exception: It's launching the PythonGatewayServer during Python UDF execution which is unexpected. It usually happens when the job codes are in the top level of the Python script file and are not enclosed in a `if name == 'main'` statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any exception.
       
      This works:
       
      my_map_state_descriptor = MapStateDescriptor(
                  "my_map_state",
                  Types.SQL_TIMESTAMP(),
                  Types.LIST(Types.TUPLE([
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.STRING(),
                      Types.SQL_TIMESTAMP(),
                      Types.SQL_TIMESTAMP(),
                      Types.SQL_TIMESTAMP(),
                      Types.BIG_INT()
                  ]))
              )

      Attachments

        Activity

          People

            Unassigned Unassigned
            elkhand Elkhan Dadashov
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: