Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28781

Unneccesary persist in PeriodicCheckpointer.update()

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.4.3
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None

      Description

      Once the update is called, newData is persisted at line 82. However, only when the checkpoint is handling (satisfy the condition at line 94), the persist data is used for the second time (do checkpoint at line 97). The other data which is not satisfied to the checkpoint condition is unnecessary to be cached. The persistedQueue avoids too many unnecessary cached data, but it is best to avoid every unnecessary persist operation.

      def update(newData: T): Unit = {
          persist(newData)
          persistedQueue.enqueue(newData)
          // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class:
          // Users should call [[update()]] when a new Dataset has been created,
          // before the Dataset has been materialized.
          while (persistedQueue.size > 3) {
            val dataToUnpersist = persistedQueue.dequeue()
            unpersist(dataToUnpersist)
          }
          updateCount += 1
      
          // Handle checkpointing (after persisting)
          if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
            && sc.getCheckpointDir.nonEmpty) {
            // Add new checkpoint before removing old checkpoints.
            checkpoint(newData)
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              spark_cachecheck Dong Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: