Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29438

Failed to get state store in stream-stream join

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 2.4.4
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      Now, Spark use the `TaskPartitionId` to determine the StateStore path.

      TaskPartitionId   \ 
      StateStoreVersion  --> StoreProviderId -> StateStore
      StateStoreName    /  
      

      In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened. Following is a sample pseudocode:

      val df3 = streamDf1.join(streamDf2)
      val df5 = streamDf3.join(batchDf4)
      val df = df3.union(df5)
      df.writeStream...start()
      

      A simplified DAG like this:

      DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
       (streamDf3)            |               (streamDf1)        (streamDf2)
           |                  |                   |                 |
        Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
           |                  |                   |                 | 
         Sort                Sort                 |                 |
           \                  /                   \                 /
            \                /                     \               /
              SortMergeJoin                    StreamingSymmetricHashJoin
                           \                 /
                             \             /
                               \         /
                                  Union
      

      Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file.

      LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
           |                  |                   |                 |
      BroadcastExchange       |              Exchange(200)     Exchange(200)
           |                  |                   |                 | 
           |                  |                   |                 |
            \                /                     \               /
             \             /                        \             /
            BroadcastHashJoin                 StreamingSymmetricHashJoin
                           \                 /
                             \             /
                               \         /
                                  Union
      

      In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              uncleGen Genmao Yu
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: