Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
3.0.0
-
None
-
None
Description
Once the fuction update() is called, the RDD newData is persisted at line 82. However, only when meeting the checking point condition (at line 94), the persisted rdd newData would be used for the second time in the API checkpoint() (do checkpoint at line 97). In other conditions, newData will only be used once and it is unnecessary to persist the rdd in that case. Although the persistedQueue will be checked to avoid too many unnecessary cached data, it would be better to avoid every unnecessary persist operation.
def update(newData: T): Unit = { persist(newData) persistedQueue.enqueue(newData) // We try to maintain 2 Datasets in persistedQueue to support the semantics of this class: // Users should call [[update()]] when a new Dataset has been created, // before the Dataset has been materialized. while (persistedQueue.size > 3) { val dataToUnpersist = persistedQueue.dequeue() unpersist(dataToUnpersist) } updateCount += 1 // Handle checkpointing (after persisting) if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 && sc.getCheckpointDir.nonEmpty) { // Add new checkpoint before removing old checkpoints. checkpoint(newData)