Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29878

Improper cache strategies in GraphX



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 3.0.0
    • None
    • GraphX
    • None


      I have run examples.graphx.SSPExample and looked through the RDD dependency graphs as well as persist operations. There are some improper cache strategies in GraphX. The same situations also exist when I run ConnectedComponentsExample.

      1. vertices.cache() and newEdges.cache() are unnecessary
      In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), and RDD vertices/newEdges are cached in apply(). But these two RDDs are not directly used anymore (their children RDDs has been cached) in SSPExample, so the persists can be unnecessary here.
      However, the other examples may need these two persists, so I think they cannot be simply removed. It might be hard to fix.

        def apply[VD: ClassTag, ED: ClassTag](
            vertices: VertexRDD[VD],
            edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
          vertices.cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
          // Convert the vertex partitions in edges to the correct type
          val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
            .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
            .cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
          GraphImpl.fromExistingRDDs(vertices, newEdges)

      2. Missing persist on newEdges
      SSSPExample will invoke pregel to do execution. Pregel will ultilize ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use by multiple actions in Pregel. So newEdges should be persisted.
      Same as the above issue, this issue is also found in ConnectedComponentsExample. It is also hard to fix, because the persist added may be unnecessary for other examples.

      // Pregel.scala
          // compute the messages
          var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // newEdges is created here
          val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
            checkpointInterval, graph.vertices.sparkContext)
          messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
          var activeMessages = messages.count() // The first time use newEdges
          while (activeMessages > 0 && i < maxIterations) {
            // Receive the messages and update the vertices.
            prevG = g
            g = g.joinVertices(messages)(vprog) // Generate g will depends on newEdges
            activeMessages = messages.count() // The second action to use newEdges. newEdges should be unpersisted after this instruction.
      // ReplicatedVertexView.scala
        def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean): Unit = {
             val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
              (ePartIter, shippedVertsIter) => ePartIter.map {
                case (pid, edgePartition) =>
                  (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
            edges = newEdges // newEdges should be persisted
            hasSrcId = includeSrc
            hasDstId = includeDst

      As I don't have much knowledge about Graphx, so I don't know how to fix these issues well.

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.


        Issue Links



              Unassigned Unassigned
              spark_cachecheck IcySanwitch
              0 Vote for this issue
              2 Start watching this issue

