Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46706

percentile_approx regression since Spark 2.4

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.1.3
    • None
    • Shuffle, SQL
    • None

    Description

      Originally reported in SPARK-40499, I'm opening a new issue as that original one was closed. It's not entirely clear if this regression exists on HEAD, but I haven't seen any changes that I would expect to address this. Here's what I reported in the other ticket.

      Chiming in, I'm currently hitting a similar issue while doing upgrades from Spark 2.4 to Spark 3.1.

      In our case, we have an application with two queries that take percentile_approx of a fairly large dataset and reduce to only a few rows (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different groupBy keys respectively). This roughly what the query looks like:

      select
          col1,
          percentile_approx(col2, 0.95) as q_95,
          percentile_approx(col2, 0.99) as q_99,
          percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff
      from data
      group by 1
      

      Also, I do recognize this could be written more optimally as percentile_approx(col2, array(0.95, 0.99)) to save on computation, this is just the snippet as it was when I stumbled on it

      We've observed that the shuffle write stage is roughly the same time (and size) but the shuffle read is drastically slower. Note, in this application the shuffle read is also collecting for a broadcast. The difference for the shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was after reducing the percentile_approx precision from the default (10000) to 100 on Spark 3.1. When left at 10000, the stage was exceeding 15 hours and hitting some of our application time limits.

      Spark 2.4

      • Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size
      • Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size

      Spark 3.1

      • Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 20.3 GiB shuffle write size
      • Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size

      Prior to digging further into this, I thought it may have been some AQE issue. That was incorrect and in fact the post-aggregation task/partition count was much worse without AQE, at about 150000.

      I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes since 3.1, to see if there's anything significant. I just looked at PRs against QuantileSummaries.scala and ApproximatePercentile.scala, and ignored some docs, formatting, refactoring, or very simple behavioural changes. These are the notable changes with some commentary.

      For changes between 2.4 and 3.1:

      1. https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543 which re-implements the QuantileSummaries merge algorithm rather deeply. This seems potentially relevant, especially since the shuffle reading which is likely just executing merging is struggling. That being said, I would have maybe expected this to also impact the partial_percentile_approx calculations too.
      2. https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3 appears to change the behaviour in evaluation for case when / conditional inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. Our actual job is a little more complex and the SQL is over a subquery, but the subquery is not manipulating the column using for percentile_approx. I misunderstood this change initially. It looks like an optimization to avoid reconstructing the data type for some parent plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, so I'll need to dig more into this.

      I didn't go far while looking into other code. I did not see relevant changes in AggregatingAccumulator.

      For changes between 3.1 onwards to ApproximatePercentile and QuantileSummaries:

      1. (in 3.2) https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0 which appears to optimize the ApproxPercentiles.getPercentiles function. It seems to only be executed on the final result collection and optimizes for cases like percentile_approx(..., array(0.5, 0.9, 0.95), ...) - multiple value percentile calculations.
      2. (in 3.2) https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3 which appears to change some plan interfaces. It reads as though only impacted query compilation performance, rather than execution performance.

      Because I'm not seeing any changes that may possible address this performance issue, I'm led to believe this is still an existing problem in Spark.

      I have some follow-up I'll do to dig into this further, namely:

      • Test the job with percentile to see if it performs better, although I'm a bit dubious we will get this to run with 37 billion input rows
      • Test the job on Spark 3.3, which we have production-ready in our environment
      • Observe some of the thread dumps / profiling for the stage with poor performance
      • Isolate, or otherwise rewrite, the ApproximatePercentile going into a CaseWhen in our specific SQL to narrow down if that change is related

      Here's an expanded version of our query, with irrelevant columns removed. I figured I'd include this after realizing the relevant of the CaseWhen parent node.

      
      with source_data as (
        select col1, col2 -- ... additional columns
        from <table>
      ), percentiles as (
        select
            col1,
            approx_percentile(col2, 0.95) as q_95,
            approx_percentile(col2, 0.99) as q_99, 
            approx_percentile(col2, 0.99) - approx_percentile(col2, 0.95) as q_diff
        from source_data
        group by 1
      ), calculations as (
        select
          col1,
          case 
            when q_diff > 5 then q_95 + 5
            else q_99
          end as calculated_val
        from percentiles
      )
      
      select
        /*+ BROADCAST (c) */
        source_data.*, c.calculated_val
      from source_data
      left join calculations as c using(col1)
      -- ... additional joins / broadcasts
      

      Later on, I found another app of ours with similarly unbearable regressions. The app previously performed in <2hr run-time prior to upgrading, and is now >24hr on Spark3. It's a lot simpler, so this is more-or-less the SQL it's doing (with a number of other internal group-by fields removed for brievity).

      with t as (
        select
          method,
          response_code,
          if(response_code >= 200 and response_code < 300, duration_ms, null) as duration_ms_2xx,
        from logs
      )
      select
        method,
        count(*) as cnt,
        sum(case when response_code >= 200 and response_code < 300 then 1 else 0 end) as cnt_2xx,
        sum(case when response_code >= 300 and response_code < 400 then 1 else 0 end) as cnt_3xx,
        sum(case when response_code >= 400 and response_code < 500 then 1 else 0 end) as cnt_4xx,
        sum(case when response_code >= 500 then 1 else 0 end) as cnt_5xx,
        approx_percentile(duration_ms_2xx, 0.5) as latency_p50,
        approx_percentile(duration_ms_2xx, 0.9) as latency_p90,
        approx_percentile(duration_ms_2xx, 0.99) as latency_p99,
        approx_percentile(duration_ms_2xx, 0.999) as latency_p999,
      from t
      group by 1
      

      This one has a conditional input into approx_percentile, so it is possible the change related to that is still a factor?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              legojoey17 Joey Pereira
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: