Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
Impala 2.7.0, Impala 2.8.0, Impala 2.9.0
Description
The below shows two equivalent queries that compute count(distinct ...). The first plan has fewer aggregations and exchanges, so will be faster if the grouping column (int_col) is unskewed. However, the second plan performs much better than the first if the grouping column is skewed. E.g. we saw an example where instead of int_col, the expression was two-valued along the lines of select distinct case when flag = 1 then 'yes' else 'no'
[localhost:21000] > explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col;
Query: explain select int_col, count(distinct bigint_col) from functional.alltypes group by int_col
+-----------------------------------------------------------+
| Explain String |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
| |
| PLAN-ROOT SINK |
| | |
| 05:EXCHANGE [UNPARTITIONED] |
| | |
| 02:AGGREGATE [FINALIZE] |
| | output: count(bigint_col) |
| | group by: int_col |
| | |
| 04:AGGREGATE |
| | group by: int_col, bigint_col |
| | |
| 03:EXCHANGE [HASH(int_col)] |
| | |
| 01:AGGREGATE [STREAMING] |
| | group by: int_col, bigint_col |
| | |
| 00:SCAN HDFS [functional.alltypes] |
| partitions=24/24 files=24 size=478.45KB |
+-----------------------------------------------------------+
[localhost:21000] > explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col;
Query: explain select int_col, count(*) from (select distinct int_col, bigint_col from functional.alltypes) a group by int_col
+-----------------------------------------------------------+
| Explain String |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=180.00MB VCores=2 |
| |
| PLAN-ROOT SINK |
| | |
| 07:EXCHANGE [UNPARTITIONED] |
| | |
| 06:AGGREGATE [FINALIZE] |
| | output: count:merge(*) |
| | group by: int_col |
| | |
| 05:EXCHANGE [HASH(int_col)] |
| | |
| 02:AGGREGATE [STREAMING] |
| | output: count(*) |
| | group by: int_col |
| | |
| 04:AGGREGATE [FINALIZE] |
| | group by: int_col, bigint_col |
| | |
| 03:EXCHANGE [HASH(int_col,bigint_col)] |
| | |
| 01:AGGREGATE [STREAMING] |
| | group by: int_col, bigint_col |
| | |
| 00:SCAN HDFS [functional.alltypes] |
| partitions=24/24 files=24 size=478.45KB |
+-----------------------------------------------------------+
Fetched 26 row(s) in 0.01s
We may be better off generating the second plan by default (or with some query option) since it is more robust.
Attachments
Issue Links
- causes
-
IMPALA-6822 Provide a query option to not shuffle on distinct exprs
- Resolved