Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
3.0.0
-
None
-
None
Description
I have run examples.graphx.SSPExample and looked through the RDD dependency graphs as well as persist operations. There are some improper cache strategies in GraphX. The same situations also exist when I run ConnectedComponentsExample.
1. vertices.cache() and newEdges.cache() are unnecessary
In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), and RDD vertices/newEdges are cached in apply(). But these two RDDs are not directly used anymore (their children RDDs has been cached) in SSPExample, so the persists can be unnecessary here.
However, the other examples may need these two persists, so I think they cannot be simply removed. It might be hard to fix.
def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { vertices.cache() // It is unnecessary for SSPExample and ConnectedComponentsExample // Convert the vertex partitions in edges to the correct type val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) .cache() // It is unnecessary for SSPExample and ConnectedComponentsExample GraphImpl.fromExistingRDDs(vertices, newEdges) }
2. Missing persist on newEdges
SSSPExample will invoke pregel to do execution. Pregel will ultilize ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use by multiple actions in Pregel. So newEdges should be persisted.
Same as the above issue, this issue is also found in ConnectedComponentsExample. It is also hard to fix, because the persist added may be unnecessary for other examples.
// Pregel.scala // compute the messages var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // newEdges is created here val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)]( checkpointInterval, graph.vertices.sparkContext) messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]]) var activeMessages = messages.count() // The first time use newEdges ... while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. prevG = g g = g.joinVertices(messages)(vprog) // Generate g will depends on newEdges ... activeMessages = messages.count() // The second action to use newEdges. newEdges should be unpersisted after this instruction.
// ReplicatedVertexView.scala def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean): Unit = { ... val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { (ePartIter, shippedVertsIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) } }) edges = newEdges // newEdges should be persisted hasSrcId = includeSrc hasDstId = includeDst } }
As I don't have much knowledge about Graphx, so I don't know how to fix these issues well.
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
Attachments
Issue Links
- duplicates
-
SPARK-29872 Improper cache strategy in examples
- Resolved