Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29844

Improper unpersist strategy in ml.recommendation.ASL.train

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.3
    • 3.0.0
    • ML
    • None

    Description

      In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end of the method, these RDDs invoke unpersist(), but the timings of unpersist is not right, which will cause recomputation and memory waste.

          val userIdAndFactors = userInBlocks
            .mapValues(_.srcIds)
            .join(userFactors)
            .mapPartitions({ items =>
              items.flatMap { case (_, (ids, factors)) =>
                ids.view.zip(factors)
              }
            // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
            // and userFactors.
            }, preservesPartitioning = true)
            .setName("userFactors")
            .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
          val itemIdAndFactors = itemInBlocks
            .mapValues(_.srcIds)
            .join(itemFactors)
            .mapPartitions({ items =>
              items.flatMap { case (_, (ids, factors)) =>
                ids.view.zip(factors)
              }
            }, preservesPartitioning = true)
            .setName("itemFactors")
            .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
          if (finalRDDStorageLevel != StorageLevel.NONE) {
            userIdAndFactors.count()
            itemFactors.unpersist() // Premature unpersist
            itemIdAndFactors.count()
            userInBlocks.unpersist() // Lagging unpersist
            userOutBlocks.unpersist() // Lagging unpersist
            itemInBlocks.unpersist() 
            itemOutBlocks.unpersist() // Lagging unpersist
            blockRatings.unpersist() // Lagging unpersist
          }
          (userIdAndFactors, itemIdAndFactors)
        }
      

      1. Unpersist itemFactors too early. itemIdAndFactors.count() will use itemFactors. So itemFactors will be recomputed.
      2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too late. The final action - itemIdAndFactors.count() will not use these RDDs, so these RDDs can be unpersisted before it to save memory.
      By the way, itemIdAndFactors is persisted here but will never be unpersisted util the application ends. It may hurts the performance, but I think it's hard to fix.

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

      Attachments

        Issue Links

          Activity

            People

              spark_cachecheck IcySanwitch
              spark_cachecheck IcySanwitch
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: