Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38204

All state operators are at a risk of inconsistency between state partitioning and operator partitioning

    XMLWordPrintableJSON

Details

    Description

      Except stream-stream join, all stateful operators use ClusteredDistribution as a requirement of child distribution.

      ClusteredDistribution is very relaxed one - any output partitioning can satisfy the distribution if the partitioning can ensure all tuples having same grouping keys are placed in same partition.

      To illustrate an example, support we do streaming aggregation like below code:

      df
        .withWatermark("timestamp", "30 minutes")
        .groupBy("group1", "group2", window("timestamp", "10 minutes"))
        .agg(count("*")) 

      In the code, streaming aggregation operator will be involved in physical plan, which would have ClusteredDistribution("group1", "group2", "window").

      The problem is, various output partitionings can satisfy this distribution:

      • RangePartitioning
        • This accepts exact and subset of the grouping key, with any order of keys (combination), with any sort order (asc/desc)
      • HashPartitioning
        • This accepts exact and subset of the grouping key, with any order of keys (combination)
      • (upcoming Spark 3.3.0+) DataSourcePartitioning
        • output partitioning provided by data source will be able to satisfy ClusteredDistribution, which will make things worse (assuming data source can provide different output partitioning relatively easier)

      e.g. even we only consider HashPartitioning, HashPartitioning("group1"), HashPartitioning("group2"), HashPartitioning("group1", "group2"), HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2", "window"), etc.

      The requirement of state partitioning is much more strict, since we should not change the partitioning once it is partitioned and built. It should ensure that all tuples having same grouping keys are placed in same partition (same partition ID) across query lifetime.

      The impedance of distribution requirement between ClusteredDistribution and state partitioning leads correctness issue silently.

      For example, let's assume we have a streaming query like below:

      df
        .withWatermark("timestamp", "30 minutes")
        .repartition("group2")
        .groupBy("group1", "group2", window("timestamp", "10 minutes"))
        .agg(count("*")) 

      repartition("group2") satisfies ClusteredDistribution("group1", "group2", "window"), so Spark won't introduce additional shuffle there, and state partitioning would be HashPartitioning("group2").

      we run this query for a while, and stop the query, and change the manual partitioning like below:

      df
        .withWatermark("timestamp", "30 minutes")
        .repartition("group1")
        .groupBy("group1", "group2", window("timestamp", "10 minutes"))
        .agg(count("*")) 

      repartition("group1") also satisfies ClusteredDistribution("group1", "group2", "window"), so Spark won't introduce additional shuffle there. That said, child output partitioning of streaming aggregation operator would be HashPartitioning("group1"), whereas state partitioning is HashPartitioning("group2").

      https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query

      In SS guide doc we enumerate the unsupported modifications of the query during the lifetime of streaming query, but there is no notion of this.

      Making this worse, Spark doesn't store any information on state partitioning (that said, there is no way to validate), so Spark simply allows this change and brings up correctness issue while the streaming query runs like no problem at all. The only way to indicate the correctness is from the result of the query.

      We have no idea whether end users already suffer from this in their queries or not. The only way to look into is to list up all state rows and apply hash function with expected grouping keys, and confirm all rows provide the exact partition ID where they are in. If it turns out as broken, we will have to have a tool to “re”partition the state correctly, or in worst case, have to ask throwing out checkpoint and reprocess.

      This issue has been laid from the introduction of stateful operators (Spark 2.2+), since HashClusteredDistribution (strict requirement) had introduced in Spark 2.3 and we didn't change stateful operators to use this distribution. stream-stream join hopefully used HashClusteredDistribution from Spark 2.3, so it seems to be safe.

      Attachments

        Activity

          People

            kabhwan Jungtaek Lim
            kabhwan Jungtaek Lim
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: