Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17888

Memory leak in streaming driver when use SparkSQL in Streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 1.6.2
    • None
    • DStreams
    • scala 2.10.4
      java 1.7.0_71

    Description

      Hi
      I have a little program of spark 1.5, it receive data from a publisher in spark streaming. It will process these received data with spark sql. But when the time goes by I found the memory leak in driver, so i update to spark 1.6.2. But, there is no change in the situation.

      here is the code:

      val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, "subresult"))
      val jsonf = lines.map(JSON.parseFull()).map(.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
      val logs = jsonf.map(data => LogStashV1(data("message").toString, data("path").toString, data("host").toString, data("lineno").toString.toDouble, data("timestamp").toString))
      logs.foreachRDD( rdd => {
      import sqc.implicits._
      rdd.toDF.registerTempTable("logstash")
      val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
      sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
      sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)

      jmap information:

      num #instances #bytes class name
      ----------------------------------------------
      1: 34819 72711952 [B
      2: 2297557 66010656 [C
      3: 2296294 55111056 java.lang.String
      4: 1063491 42539640 org.apache.spark.scheduler.AccumulableInfo
      5: 1251001 40032032 scala.collection.immutable.HashMap$HashMap1
      6: 1394364 33464736 java.lang.Long
      7: 1102516 26460384 scala.collection.immutable.$colon$colon
      8: 1058202 25396848 org.apache.spark.sql.execution.metric.LongSQLMetricValue
      9: 1266499 20263984 scala.Some
      10: 124052 15889104 <methodKlass>
      11: 124052 15269568 <constMethodKlass>
      12: 11350 12082432 <constantPoolKlass>
      13: 11350 11692880 <instanceKlassKlass>
      14: 96682 10828384 org.apache.spark.executor.TaskMetrics
      15: 233481 9505896 [Lscala.collection.immutable.HashMap;
      16: 96682 6961104 org.apache.spark.scheduler.TaskInfo
      17: 9589 6433312 <constantPoolCacheKlass>
      18: 233000 5592000 scala.collection.immutable.HashMap$HashTrieMap
      19: 96200 5387200 org.apache.spark.executor.ShuffleReadMetrics
      20: 113381 3628192 scala.collection.mutable.ListBuffer
      21: 7252 2891792 <methodDataKlass>
      22: 117073 2809752 scala.collection.mutable.DefaultEntry

      Attachments

        Activity

          People

            Unassigned Unassigned
            FireThief weilin.chen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: