Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40925

Fix late record filtering to support chaining of steteful operators

    XMLWordPrintableJSON

Details

    Description

      This is followup ticket on https://issues.apache.org/jira/browse/SPARK-40821.

      Here we propose fixing the late record filtering in stateful operators to allow chaining of stateful operators which do not produce delayed records (like time-interval join of potentially flatMapGroupsWithState) - e.g. time-equality streaming join followed by aggregations or chaining of window aggregations.

       

      There are 2 issues which need to be addressed:

      1. Stateful operators filter input late records based on the current watermark. If e.g. chaining window aggregations, the records produced by the first window aggregation will be behind the current watermark by semantics (the watermark closes all past windows and emits the corresponding aggregates) and therefore these records will by definition appear late relative to the current watermark in the second stateful operator. The proposed fix for this issue is to use the previous batch watermark for late record filtering and the current batch watermark for state eviction - effectively each stateful operator should be initialized with 2 watermark values instead of 1.
      2. The second issue with chaining window aggregators is that the records produced by the first aggregator do not have explicit event time column and thus can not be directly fed into a subsequent stateful operator which needs that column. This is partially handled by https://github.com/apache/spark/pull/38288 so the user can explicitly introduce a new event time column by extracting the event time from the window column. This is slightly cumbersome. We propose changing the window function to handle the window column transparently - e.g.

      input
        .withWatermark("eventTime", "1 seconds")
        .groupBy(window($"eventTime", "5 seconds") as 'window)
        .agg(count("*") as 'count)
        .groupBy(window($"window", "10 seconds"))
        .agg(count("*") as 'count, sum("count") as 'sum)
        .select($"count".as[Long], $"sum".as[Long])

       

       

      Attachments

        Activity

          People

            alex-balikov Alex Balikov
            alex-balikov Alex Balikov
            Jungtaek Lim Jungtaek Lim
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: