Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19023

Memory leak on GraphX with an iterative algorithm and checkpoint on the graph

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.0.2
    • None
    • GraphX

    Description

      I am facing OOM whithin a spark streaming application with GraphX.
      While trying to reproduce the issue on a simple application, I was able to identify what appears to be 2 kind of memory leaks.

      Leak 1

      It can be reproduced with this simple scala application (that simulates more or less what I'm doing in my spark streaming application, each iteration within the loop simulating one micro-batch).

      TestGraph.scala
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import org.apache.spark.graphx.Graph
      import org.apache.spark.rdd.RDD
      import org.apache.spark.graphx._
      
      
      object TestGraph {
          case class ImpactingEvent(entityInstance: String)
          case class ImpactedNode(entityIsntance:String)
          case class RelationInstance(relationType : String)
          var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
          
          def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
            conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
            val sc = new SparkContext(conf)
            sc.setLogLevel("ERROR")
           
            val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
            
            val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
              
            impactingGraph = Graph(vertices, edges, null)
            
            var x =0;
            for(x <- 1 to 10){
              impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
              
              impactingGraph.checkpoint()
              impactingGraph.edges.count()
              impactingGraph.vertices.count()
            }
            println("Hello")
            Thread.sleep(10000000)
          }
          
          private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
            var graph = impactingGraph.mapVertices((id, node) => node ).cache
            impactingGraph.unpersist(true)
            graph.cache();
          }
      }
      
      

      In this simple application, I am just applying a mapVertices transformation on the graph and then I am doing a checkpoint on the graph. I am doing this operation 10 times.
      After this application finished the loop, I am taking an heapdump.

      In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.
      My expectation is to have only 1 (the one referenced in the global variable impactingGraph).

      The "leak" is coming from the f function in a MapPartitionsRDD (which is referenced by the partitionsRDD variable of my VertexRDD).
      This f function contains an outer reference to the graph created in the previous iteration.

      I can see that in the clearDependencies function of MapPartitionsRDD, the f function is not reset to null.

      When looking to similar issues, I found this pull request:
      https://github.com/apache/spark/pull/3545

      In this pull request, the f variable is reset to null in the clearDependencies method of the ZippedPartitionsRDD.
      I do not understand why the same is not done within the MapPartitionsRDD.
      I made a try by patching spark-core and by setting f to null in clearDependencies of MapPartitionsRDD and it solved my leak on this simple use case.

      Don't you think the f variable has to be reset to null also in MapPartitionsRDD ?

      Leak 2

      Now, I'll do the same but in the propageEvent method in addition to the mapVertices I am doing a joinVertices on the graph.
      It can be found in the following application:

      TestGraph.scala
      import org.apache.spark.SparkConf
      import org.apache.spark.SparkContext
      import org.apache.spark.graphx.Graph
      import org.apache.spark.rdd.RDD
      import org.apache.spark.graphx._
      
      
      object TestGraph {
          case class ImpactingEvent(entityInstance: String)
          case class ImpactedNode(entityIsntance:String)
          case class RelationInstance(relationType : String)
          var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
          
          def main(args: Array[String]) {
            val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
            conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
            val sc = new SparkContext(conf)
            sc.setLogLevel("ERROR")
           
            val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
            
            val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
              
            impactingGraph = Graph(vertices, edges, null)
            
            var x =0;
            for(x <- 1 to 10){
              impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
              
              impactingGraph.checkpoint()
              impactingGraph.edges.count()
              impactingGraph.vertices.count()
            }
            println("Hello")
            Thread.sleep(10000000)
          }
          
          private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
            var graph = impactingGraph.mapVertices((id, node) => node ).cache
            val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( (1L, "Node1"), (2L, "Node2")) )
            graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
            impactingGraph.unpersist(true)
            graph.cache();
          }
      }
      
      

      When running this application and taking a memory dump, I can still see 11 "live" GraphImpl in memory (where I am expecting only 1) (even with the patch described in the previous section).

      When analyzing this dump, I can see that the "leak" is coming from a reference to an array of partitions hold by the "partitions_" variable within the EdgeRDD (this array of partitions contains a reference to the MapPartitionsRDD that contains a reference to the graph created by the previous iteration similarly to what is described in the Leak 1 section)

      This array of partitions is referenced 2 times:

      • once in the "partitions_" variable of the partitionsRDD emebedded within the EdgeRDD
      • once in the "partitions_" variable of the EdgeRDD itself

      This is coming from the getPartition method within the EdgeRDD

      EdgeRDD.scala
        override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
      
      

      After the checkpoint and count is called on graph edges, the reference to this array is cleaned within the partitionsRDD of the EdgeRDD.
      It is done through this call:

      RDD.scala
        /**
         * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
         * created from the checkpoint file, and forget its old dependencies and partitions.
         */
        private[spark] def markCheckpointed(): Unit = {
          clearDependencies()
          partitions_ = null
          deps = null    // Forget the constructor argument for dependencies too
        }
      
      

      But this is not done for the "partitions_" variable of the EdgeRDD itself.
      Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but only on the partitionsRDD embedded within the EdgeRDD.

      Due to that, we still have a reference to this array of partitions (that references a MapPartitionsRDD that references the graph of the previous iteration).

      I am able to solve this leak if I am calling the checkpoint and count on the edges just after the mapVertices (and before the joinVertices) (and if the patch described in the previous section is applied on MapPartitionsRDD).

      But it doesn't seem clean to me.
      In my mind:

      • either the "partitions_" variable of the EdgeRDD should be reset to null after a checkpoint is called on the Graph
      • either the "partitions_" variable of the EdgeRDD should not reference the same array of partitions as the one referenced by the "partitions_" variable of the partitionsRDD. (don't know if this "partitions_" is really usefull on the EdgeRDD)

      What do you think?

      Attachments

        Activity

          People

            Unassigned Unassigned
            jmassiot Julien MASSIOT
            Votes:
            3 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: