Description
In mllib.clustering.KMeans.run(), the rdd norms is persisted. But norms only has a single child, i.e., the rdd zippedData which was not persisted. So all the actions that reply on norms also reply on zippedData. The rdd zippedData will be used by multiple times in runAlgorithm(). Therefore zippedData should be persisted, not norms.
private[spark] def run( data: RDD[Vector], instr: Option[Instrumentation]): KMeansModel = { if (data.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Compute squared norms and cache them. val norms = data.map(Vectors.norm(_, 2.0)) norms.persist() // Unnecessary persist. Only used to generate zippedData. val zippedData = data.zip(norms).map { case (v, norm) => new VectorWithNorm(v, norm) } // needs to persist val model = runAlgorithm(zippedData, instr) norms.unpersist() // Change to zippedData.unpersist()
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
Attachments
Issue Links
- links to