Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28999

Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

    XMLWordPrintableJSON

    Details

    • Type: Question
    • Status: Resolved
    • Priority: Major
    • Resolution: Invalid
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

       

      How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you

      //代码占位符
      package org.roy.demo.streaming.bus
      
      import java.sql.Timestamp
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions.current_timestamp
      import org.apache.spark.sql.streaming._
      import streaming.StreamingExamples
      
      object StructuredOrderStateListRturn {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        StreamingExamples.setStreamingLogLevels()
      
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder.master("local[*]")
            .appName("StructuredSessionization")
            .getOrCreate()
          import spark.implicits._
          // Create DataFrame representing the stream of input lines from connection to host:port
          val lines = spark.readStream
            .format("socket")
            .option("host", "10.200.102.192")
            .option("port", 9998)
            .load().withColumn("current_timestamp", current_timestamp)
          //100,1,10,20,2019-09-03
          //1001,1,10,200,2019-09-03
          //1001,1,10,2000,2019-09-03
          val events = lines
            .as[(String, Timestamp)]
            .map { case (line, timestamp) => {
              val orderInfo = line.split(",")
              if (orderInfo != null && orderInfo.size > 4) {
                val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
                objEvent
              } else {
                null
              }
            }
            }.filter(obj => obj != null)
          /**
            * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
            */
          val orderUpdates = events
            .groupByKey(event => event.orderId)
            //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
            .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) =>
              // 如果时间超时,更新缓存
              if (state.hasTimedOut) {
                //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
                val finalUpdate =
                  orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
                state.remove()
                finalUpdate
              } else {
                //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
                var oldOrder = 0.0 //上一笔的金额
                val lastEnvent = events.toSeq.last
                val updatedSession = if (state.exists) {
                  oldOrder = state.get.money
                  //存在,算出旧的金额是多少
                  orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
                } else {
                  orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
                }
                //更新缓存里面的这条数据信息
                state.update(updatedSession)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false)
              }
          }
      
          /**
            * 二次计算出门店的数据
            */
          val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) =>
              // 如果时间超时,更新缓存
              if (state.hasTimedOut) {
                //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
                val finalUpdate =
                  g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
                state.remove()
                finalUpdate
              } else {
                var storeNum = events.map(_.orderId).size
                var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
                val updatedStore = if (state.exists) { //门店存在,
                  val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
                  //门店总客+=新订单金额-旧订单
                  storeMoney = state.get.money + storeMoney - old_order_moneys
                  g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp)
                } else {
                  g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
                }
                //更新缓存里面的这条数据信息
                state.update(updatedStore)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true)
              }
          }
          //门店统计好的数据在汇总
      //    storeUpdate.createOrReplaceTempView("update_tmp")
      //    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
          val query = storeUpdate
            .writeStream
            .outputMode("update")
            .format("console")
            .start()
          query.awaitTermination()
        }
      }
      
      /** User-defined data type representing the input events */
      case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp)
      
      case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)
      
      case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
                                      expired: Boolean)
      
      /** 第一个分组信息 */
      case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp)
      
      case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean)
      
      
      

       

       

       

      I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data.

      package org.roy.demo.streaming.bus
      
      import java.sql.Timestamp
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions.current_timestamp
      import org.apache.spark.sql.streaming._
      import streaming.StreamingExamples
      
      /**
        * create by Roy 2019/09/06
        * Counting day order number and amount
        */
      object StructuredStoreOrderState {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        StreamingExamples.setStreamingLogLevels()
      
        def main(args: Array[String]): Unit = {
          val spark = SparkSession
            .builder.master("local[*]")
            .appName("StructuredSessionization")
            .getOrCreate()
          import spark.implicits._
          // Create DataFrame representing the stream of input lines from connection to host:port
          val lines = spark.readStream
            .format("socket")
            .option("host", "10.200.102.192")
            .option("port", 9998)
            .load().withColumn("current_timestamp", current_timestamp)
          //100,1,10,20,2019-09-03
          //1001,1,10,200,2019-09-03
          //1001,1,10,2000,2019-09-03
          val events = lines
            .as[(String, Timestamp)]
            .map { case (line, timestamp) => {
              val orderInfo = line.split(",")
              if (orderInfo != null && orderInfo.size > 4) {
                val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
                objEvent
              } else {
                null
              }
            }
            }.filter(obj => obj != null)
          val orderUpdates = events
            .groupByKey(event => event.storeId)
            .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
            case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) =>
              val seqs = values.toSeq
              val times = seqs.map(_.timestamp).seq
              val max_time = new Timestamp(System.currentTimeMillis())
              if (state.hasTimedOut) {
                val finalUpdate =
                  storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true)
                state.remove()
                finalUpdate
              } else {
                val updatedSession = if (state.exists) {
                  val stateMap = state.get.orderInfoStoreMap
                  var norderMap2: Map[String, Double] = Map()
                  var num = 0
                  var money = 0.0
                  seqs.foreach(e => {
                    if (stateMap.contains(e.orderId)) {
                      //新订单-旧订单,再进行+总合
                      money += e.money - stateMap.get(e.orderId).get
                    } else {
                      num += 1
                      money += e.money
                    }
                    norderMap2 += (e.orderId -> e.money)
                  })
                  //取出所有的订单+流进来的订单,需要判断是否有重复订单
                  storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
                } else {
                  var norderMap2: Map[String, Double] = Map()
                  var money = 0.0
                  seqs.foreach(e => {
                    money += e.money
                    norderMap2 += (e.orderId -> e.money)
                  })
                  storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time)
                }
                //更新缓存里面的这条数据信息
                println("updatedSession" + updatedSession)
                println(updatedSession.orderMoney)
                state.update(updatedSession)
                // Set timeout such that the session will be expired if no data received for 10 seconds
                state.setTimeoutDuration("3600 seconds")
                storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false)
              }
          }
      
      
          val query = orderUpdates
            .writeStream
            .outputMode("update")
            .format("console")
            .start()
          query.awaitTermination()
        }
      }
      
      /** User-defined data type representing the input events */
      case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
      
      //最一个订单状态
      case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp)
      
      //门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  Seq[orderEnventInfo]
      case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)
      
      //返回计算后的结果
      case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp,
                                           expired: Boolean)
      
      
      
      

      The final output I need is this

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ruilaing ruiliang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: