Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26629

Error with multiple file stream in a query + restart on a batch that has no data for one file stream

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0, 2.4.1
    • 2.4.1, 3.0.0
    • Structured Streaming
    • None

    Description

      When a streaming query has multiple file streams, and there is a batch where one of the file streams dont have data in that batch, then if the query has to restart from that, it will throw the following error.

      java.lang.IllegalStateException: batch 1 doesn't exist
      	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
      	at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
      	at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
      	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
      	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
      

      *Reason*
      Existing HDFSMetadata.verifyBatchIds throws error whenever the batchIds list was empty. In the context of FileStreamSource.getBatch (where verify is called) and FileStreamSourceLog (subclass of HDFSMetadata), this is usually okay because, in a streaming query with one file stream, the batchIds can never be empty:

      A batch is planned only when the FileStreamSourceLog has seen new offset (that is, there are new data files).
      So FileStreamSource.getBatch will be called on X to Y where X will always be > Y. This calls internally HDFSMetadata.verifyBatchIds (X+1, Y) with X+1-Y ids.
      For example, FileStreamSource.getBatch(4, 5) will call verify(batchIds = Seq(5), start = 5, end = 5). However, the invariant of X > Y is not true when there are two file stream sources, as a batch may be planned even when only one of the file streams has data. So one of the file stream may not have data, which can call FileStreamSource.getBatch(X, X) -> verify(batchIds = Seq.empty, start = X+1, end = X) -> failure.

      Note that FileStreamSource.getBatch(X, X) gets called only when restarting a query in a batch where a file source did not have data. This is because, in normal planning of batches, MicroBatchExecution avoids calling FileStreamSource.getBatch(X, X) when offset X has not changed. However, when restarting a stream at such a batch, MicroBatchExecution.populateStartOffsets() calls FileStreamSource.getBatch(X, X) (DataSource V1 hack to initialize the source with last known offsets) thus hitting this issue.

      *Solution*
      The minimum solution (that can be backported) here is to skip verification when FileStreamSource.getBatch(X, X).

      Attachments

        Issue Links

          Activity

            People

              tdas Tathagata Das
              tdas Tathagata Das
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: