Affects Version/s: 2.4.0
Fix Version/s: None
Component/s: Structured Streaming
My code is as follows:
1) When I first ran it, I found that app could run normally.
2) Then, for some reason, I deleted the checkpoint directory of structured streaming and did not delete the savepath of sink file which saves HDFS files.
3) Then restart app, at which time only executor was assigned after app started, and no tasks were assigned. In the log, I found the print message: "INFO streaming. FileStream Sink: Skipping already committed batch 72". Later I looked at the source code and found that the log was from https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L108
4) The 3) situation lasts for several hours before the DAGScheduler is triggered to divide the DAG, submitStages, submitTasks, and tasks are assigned to the executor.
Later, I read the https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala code carefully, and realized that in FileStreamSink, a log would be included under savepath/_spark_metadata, if the current batchId<=log. getLatest () will skip saving and output the log directly: logInfo (s "Skipping already committed batch $batchId").
I think that since checkpoint is used, all information control rights should be given to checkpoint, and there should not be a batchId log information record.