Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35897

Support user defined initial state with flatMapGroupsWithState in Structured Streaming

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2
    • 3.2.0
    • Structured Streaming
    • None

    Description

      Structured Streaming supports arbitrary stateful processing using mapGroupsWithState and flatMapGroupWithState operators. The state is created by processing the data that comes in with every batch. This API improvement will allow users to specify an initial state which is applied at the time of executing the first batch.

       

      Proposed new APIs (Scala)

       

       

        def mapGroupsWithState[S: Encoder, U: Encoder](

            timeoutConf: GroupStateTimeout,

            initialState: Dataset[(K, S)])(

            func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] 

       

        def flatMapGroupsWithState[S: Encoder, U: Encoder](

            outputMode: OutputMode,

            timeoutConf: GroupStateTimeout,

            initialState: Dataset[(K, S)])(

            func: (K, Iterator[V], GroupState[S]) => Iterator[U])

       

         Proposed new APIs (Java)

        

          def mapGroupsWithState[S, U](

            func: MapGroupsWithStateFunction[K, V, S, U],

            stateEncoder: Encoder[S],

            outputEncoder: Encoder[U],

            timeoutConf: GroupStateTimeout,

            initialState: Dataset[(K, S)]): Dataset[U]

          def flatMapGroupsWithState[S, U](

            func: FlatMapGroupsWithStateFunction[K, V, S, U],

            outputMode: OutputMode,

            stateEncoder: Encoder[S],

            outputEncoder: Encoder[U],

            timeoutConf: GroupStateTimeout,

            initialState: Dataset[(K, S)]): Dataset[U]

       

         

      Example Usage

         

          val initialState: Dataset[(String, RunningCount)] = Seq(

            ("a", new RunningCount(1)),

      ("b", new RunningCount(1))

          ).toDS()

       

          val inputData = MemoryStream[String]

          val result =

            inputData.toDS()

              .groupByKey(x => x)

              .mapGroupsWithState(initialState, timeoutConf)(stateFunc)

      Attachments

        Activity

          People

            rahulsmahadev Rahul Shivu Mahadev
            rahulsmahadev Rahul Shivu Mahadev
            Tathagata Das Tathagata Das
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: