Description
In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end of the method, these RDDs invoke unpersist(), but the timings of unpersist is not right, which will cause recomputation and memory waste.
val userIdAndFactors = userInBlocks .mapValues(_.srcIds) .join(userFactors) .mapPartitions({ items => items.flatMap { case (_, (ids, factors)) => ids.view.zip(factors) } // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks // and userFactors. }, preservesPartitioning = true) .setName("userFactors") .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix val itemIdAndFactors = itemInBlocks .mapValues(_.srcIds) .join(itemFactors) .mapPartitions({ items => items.flatMap { case (_, (ids, factors)) => ids.view.zip(factors) } }, preservesPartitioning = true) .setName("itemFactors") .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix if (finalRDDStorageLevel != StorageLevel.NONE) { userIdAndFactors.count() itemFactors.unpersist() // Premature unpersist itemIdAndFactors.count() userInBlocks.unpersist() // Lagging unpersist userOutBlocks.unpersist() // Lagging unpersist itemInBlocks.unpersist() itemOutBlocks.unpersist() // Lagging unpersist blockRatings.unpersist() // Lagging unpersist } (userIdAndFactors, itemIdAndFactors) }
1. Unpersist itemFactors too early. itemIdAndFactors.count() will use itemFactors. So itemFactors will be recomputed.
2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too late. The final action - itemIdAndFactors.count() will not use these RDDs, so these RDDs can be unpersisted before it to save memory.
By the way, itemIdAndFactors is persisted here but will never be unpersisted util the application ends. It may hurts the performance, but I think it's hard to fix.
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
Attachments
Issue Links
- links to