Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28999

Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Question
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.3.0
    • None
    • Structured Streaming
    • None


      Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;


      How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you

      package org.roy.demo.streaming.bus
      import java.sql.Timestamp
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions.current_timestamp
      import org.apache.spark.sql.streaming._
      import streaming.StreamingExamples
      object StructuredOrderStateListRturn {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
          import spark.implicits._
          // Create DataFrame representing the stream of input lines from connection to host:port
          val lines = spark.readStream
            .option("host", "")
            .option("port", 9998)
            .load().withColumn("current_timestamp", current_timestamp)
          val events = lines
            .as[(String, Timestamp)]
            .map { case (line, timestamp) => {
              val orderInfo = line.split(",")
              if (orderInfo != null && orderInfo.size > 4) {
                val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
              } else {
            }.filter(obj => obj != null)
            * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
          val orderUpdates = events
            .groupByKey(event => event.orderId)
            .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) =>
              // 如果时间超时,更新缓存
              if (state.hasTimedOut) {
                //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
                val finalUpdate =
                  orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
              } else {
                var oldOrder = 0.0 //上一笔的金额
                val lastEnvent = events.toSeq.last
                val updatedSession = if (state.exists) {
                  oldOrder = state.get.money
                  orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
                } else {
                  orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false)
            * 二次计算出门店的数据
          val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) =>
              // 如果时间超时,更新缓存
              if (state.hasTimedOut) {
                //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
                val finalUpdate =
                  g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
              } else {
                var storeNum = events.map(_.orderId).size
                var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
                val updatedStore = if (state.exists) { //门店存在,
                  val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
                  storeMoney = state.get.money + storeMoney - old_order_moneys
                  g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp)
                } else {
                  g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
      //    storeUpdate.createOrReplaceTempView("update_tmp")
      //    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
          val query = storeUpdate
      /** User-defined data type representing the input events */
      case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp)
      case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)
      case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
                                      expired: Boolean)
      /** 第一个分组信息 */
      case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp)
      case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean)




      I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data.

      package org.roy.demo.streaming.bus
      import java.sql.Timestamp
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions.current_timestamp
      import org.apache.spark.sql.streaming._
      import streaming.StreamingExamples
        * create by Roy 2019/09/06
        * Counting day order number and amount
      object StructuredStoreOrderState {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
          import spark.implicits._
          // Create DataFrame representing the stream of input lines from connection to host:port
          val lines = spark.readStream
            .option("host", "")
            .option("port", 9998)
            .load().withColumn("current_timestamp", current_timestamp)
          val events = lines
            .as[(String, Timestamp)]
            .map { case (line, timestamp) => {
              val orderInfo = line.split(",")
              if (orderInfo != null && orderInfo.size > 4) {
                val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
              } else {
            }.filter(obj => obj != null)
          val orderUpdates = events
            .groupByKey(event => event.storeId)
            .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) =>
              val seqs = values.toSeq
              val times = seqs.map(_.timestamp).seq
              val max_time = new Timestamp(System.currentTimeMillis())
              if (state.hasTimedOut) {
                val finalUpdate =
                  storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true)
              } else {
                val updatedSession = if (state.exists) {
                  val stateMap = state.get.orderInfoStoreMap
                  var norderMap2: Map[String, Double] = Map()
                  var num = 0
                  var money = 0.0
                  seqs.foreach(e => {
                    if (stateMap.contains(e.orderId)) {
                      money += e.money - stateMap.get(e.orderId).get
                    } else {
                      num += 1
                      money += e.money
                    norderMap2 += (e.orderId -> e.money)
                  storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
                } else {
                  var norderMap2: Map[String, Double] = Map()
                  var money = 0.0
                  seqs.foreach(e => {
                    money += e.money
                    norderMap2 += (e.orderId -> e.money)
                  storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time)
                println("updatedSession" + updatedSession)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false)
          val query = orderUpdates
      /** User-defined data type representing the input events */
      case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
      case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
      //门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  Seq[orderEnventInfo]
      case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)
      case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp,
                                           expired: Boolean)

      The final output I need is this




          This comment will be Viewable by All Users Viewable by All Users


            Unassigned Unassigned
            ruilaing ruiliang
            0 Vote for this issue
            1 Start watching this issue




                Issue deployment