Uploaded image for project: 'IMPALA'
  1. IMPALA
  2. IMPALA-4794

Impala's count(distinct ...) plans are not robust to data skew

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • Impala 2.7.0, Impala 2.8.0, Impala 2.9.0
    • Impala 2.10.0
    • Frontend

    Description

      The below shows two equivalent queries that compute count(distinct ...). The first plan has fewer aggregations and exchanges, so will be faster if the grouping column (int_col) is unskewed. However, the second plan performs much better than the first if the grouping column is skewed. E.g. we saw an example where instead of int_col, the expression was two-valued along the lines of select distinct case when flag = 1 then 'yes' else 'no'

      [localhost:21000] > explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col;
      Query: explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 05:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 02:AGGREGATE [FINALIZE]                                   |
      | |  output: count(bigint_col)                              |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE                                              |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      
      [localhost:21000] > explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col;
      Query: explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col
      +-----------------------------------------------------------+
      | Explain String                                            |
      +-----------------------------------------------------------+
      | Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
      |                                                           |
      | PLAN-ROOT SINK                                            |
      | |                                                         |
      | 07:EXCHANGE [UNPARTITIONED]                               |
      | |                                                         |
      | 06:AGGREGATE [FINALIZE]                                   |
      | |  output: count:merge(*)                                 |
      | |  group by: int_col                                      |
      | |                                                         |
      | 05:EXCHANGE [HASH(int_col)]                               |
      | |                                                         |
      | 02:AGGREGATE [STREAMING]                                  |
      | |  output: count(*)                                       |
      | |  group by: int_col                                      |
      | |                                                         |
      | 04:AGGREGATE [FINALIZE]                                   |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 03:EXCHANGE [HASH(int_col,bigint_col)]                    |
      | |                                                         |
      | 01:AGGREGATE [STREAMING]                                  |
      | |  group by: int_col, bigint_col                          |
      | |                                                         |
      | 00:SCAN HDFS [functional.alltypes]                        |
      |    partitions=24/24 files=24 size=478.45KB                |
      +-----------------------------------------------------------+
      Fetched 26 row(s) in 0.01s
      

      We may be better off generating the second plan by default (or with some query option) since it is more robust.

      Attachments

        Issue Links

          Activity

            People

              tianyiwang Tianyi Wang
              tarmstrong Tim Armstrong
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: