Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
Impala 2.3.0, Impala 2.5.0, Impala 2.4.0, Impala 2.6.0
Description
Queries with the following characteristics may intermittently return incorrect results:
- non-grouping aggregation on a nested collection that is joined to parent table
- query has straight_join hint or has several joins on nested collections
The characteristic plan shape that is subject to this bug has a subplan with a nested-loop join with a non-grouping aggregation on its build side (left side).
Example with characteristic plan shape:
select straight_join c_custkey, cnt
from tpch_nested_parquet.customer c,
(select count(*) cnt1 from c.c_orders) v
where cnt1 = 1
+------------------------------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=186.00MB VCores=1 |
| WARNING: The following tables are missing relevant table and/or column statistics. |
| tpch_nested_parquet.customer |
| |
| 06:EXCHANGE [UNPARTITIONED] |
| | |
| 01:SUBPLAN |
| | |
| |--05:NESTED LOOP JOIN [CROSS JOIN] <--- NLJ with AGG on build side |
| | | |
| | |--04:AGGREGATE [FINALIZE] |
| | | | output: count(*) |
| | | | having: count(*) = 1 |
| | | | |
| | | 03:UNNEST [c.c_orders] |
| | | |
| | 02:SINGULAR ROW SRC |
| | |
| 00:SCAN HDFS [tpch_nested_parquet.customer c] |
| partitions=1/1 files=4 size=577.87MB |
+------------------------------------------------------------------------------------+
I suspect that our tests did did not catch this because we typically invert nested-loop joins to have singular row sources on the build side, so this bug would only manifest in more complex plans or with straight_join.
The underlying problem seems to be that the memory backing aggregation tuples is not properly transferred to the output batch in the last subplan iteration.
See partitioned-aggregation-node.cc:
// Some but not all memory is transferred here: void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) { ... // Keep the current chunk to amortize the memory allocation over a series // of Reset()/Open()/GetNext()* calls. row_batch->tuple_data_pool()->AcquireData(mem_pool_.get(), true); // This node no longer owns the memory for singleton_output_tuple_. singleton_output_tuple_ = NULL; } // When closing the agg node we free the mem pool, but there may be in-flight rows still referencing its memory. void PartitionedAggregationNode::Close(RuntimeState* state) { ... if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll(); if (mem_pool_.get() != NULL) mem_pool_->FreeAll(); if (ht_ctx_.get() != NULL) ht_ctx_->Close(); if (serialize_stream_.get() != NULL) serialize_stream_->Close(); ... }
The best way to reproduce is to enable the following query options:
set num_nodes=1;
set num_scanner_threads=1;
select straight_join c_custkey, cnt
from tpch_nested_parquet.customer c,
(select count(*) cnt1 from c.c_orders) v
where cnt1 = 1
// Garbage results
+-----------+--------+
| c_custkey | cnt1 |
+-----------+--------+
| 14996 | 135730 |
| 9938 | 0 |
| 48365 | 97291 |
| 131732 | 2 |
| 43360 | 67355 |
| 42239 | 0 |
| 17480 | 0 |
| 86840 | 12271 |
| 2855 | 0 |
| 138173 | 0 |
| 52973 | 0 |
| 140732 | 0 |
| 148949 | 0 |
| 87212 | 0 |
| 25622 | 0 |
| 1910 | 0 |
| 67328 | 0 |
+-----------+--------+