Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15719

Exceptions when using scala types directly with the State Process API

    XMLWordPrintableJSON

Details

    Description

      I followed these steps to generate and read states:

      1. implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
      2. execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
      3. implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."

      ReaderFunction code as below:

      // code placeholder
        class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
          var countState: ValueState[(Long, Long)] = _
          override def open(parameters: Configuration): Unit = {
            val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])
            countState = getRuntimeContext().getState(stateDescriptor)
          }    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
            out.collect(countState.value())
          }
        }
      

      1: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state 

      2: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              yingz Ying Z
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: