impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/6] incubator-impala git commit: IMPALA-5788: Fix agg node crash when grouping by nondeterministic exprs
Date Wed, 23 Aug 2017 06:40:12 GMT
IMPALA-5788: Fix agg node crash when grouping by nondeterministic exprs

Fixed a bug where impala crashes during execution of an aggregation
query using nondeterministic grouping expressions. This happens when
it tries to rebuild a spilled partition that can fit in memory and rows
get re-hashed to a partition other than the spilled one due to the use
of nondeterministic expressions.

Testing:
Added a query test to verify successful execution.

Change-Id: Ibdb09239577b3f0a19d710b0d148e882b0b73e23
Reviewed-on: http://gerrit.cloudera.org:8080/7714
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6c02972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6c02972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6c02972

Branch: refs/heads/master
Commit: b6c02972d6bb8dc4c62ef806c6145acae95842ad
Parents: c871e00
Author: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Authored: Wed Aug 16 17:45:06 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Aug 23 03:59:02 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-aggregation-node.cc     | 23 +++++++++++++++++++-
 be/src/exec/partitioned-aggregation-node.h      | 11 ++++++----
 .../queries/QueryTest/spilling.test             | 13 +++++++++++
 3 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index a0fed41..b1d54a6 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1152,6 +1152,17 @@ Status PartitionedAggregationNode::CreateHashPartitions(
     }
     hash_tbls_[i] = partition->hash_tbl.get();
   }
+  // In this case we did not have to repartition, so ensure that while building the hash
+  // table all rows will be inserted into the partition at 'single_partition_idx' in case
+  // a non deterministic grouping expression causes a row to hash to a different
+  // partition index.
+  if (single_partition_idx != -1) {
+    Partition* partition = hash_partitions_[single_partition_idx];
+    for (int i = 0; i < PARTITION_FANOUT; ++i) {
+      hash_partitions_[i] = partition;
+      hash_tbls_[i] = partition->hash_tbl.get();
+    }
+  }
 
   COUNTER_ADD(partitions_created_, num_partitions_created);
   if (!is_streaming_preagg_) {
@@ -1390,7 +1401,13 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows)
{
   }
   DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to "
                                << "reclaim memory: " << buffer_pool_client_.DebugString();
-  hash_tbls_[partition_idx] = NULL;
+  // Remove references to the destroyed hash table from 'hash_tbls_'.
+  // Additionally, we might be dealing with a rebuilt spilled partition, where all
+  // partitions point to a single in-memory partition. This also ensures that 'hash_tbls_'
+  // remains consistent in that case.
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
+  }
   return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
 }
 
@@ -1402,6 +1419,10 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows)
{
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     Partition* partition = hash_partitions_[i];
     if (partition == nullptr) continue;
+    // We might be dealing with a rebuilt spilled partition, where all partitions are
+    // pointing to a single in-memory partition, so make sure we only proceed for the
+    // right partition.
+    if(i != partition->idx) continue;
     int64_t aggregated_rows = 0;
     if (partition->aggregated_row_stream != nullptr) {
       aggregated_rows = partition->aggregated_row_stream->num_rows();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 210400e..fa8674c 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -336,10 +336,14 @@ class PartitionedAggregationNode : public ExecNode {
   /// Object pool that holds the Partition objects in hash_partitions_.
   boost::scoped_ptr<ObjectPool> partition_pool_;
 
-  /// Current partitions we are partitioning into.
+  /// Current partitions we are partitioning into. IMPALA-5788: For the case where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this vector will
+  /// point to a single in-memory partition.
   std::vector<Partition*> hash_partitions_;
 
-  /// Cache for hash tables in 'hash_partitions_'.
+  /// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we
+  /// rebuild a spilled partition that fits in memory, all pointers in this array will
+  /// point to the hash table that is a part of a single in-memory partition.
   HashTable* hash_tbls_[PARTITION_FANOUT];
 
   /// All partitions that have been spilled and need further processing.
@@ -623,8 +627,7 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// Initializes hash_partitions_. 'level' is the level for the partitions to create.
   /// If 'single_partition_idx' is provided, it must be a number in range
-  /// [0, PARTITION_FANOUT), and only that partition is created - the others are
-  /// initialized to NULL.
+  /// [0, PARTITION_FANOUT), and only that partition is created - all others point to it.
   /// Also sets ht_ctx_'s level to 'level'.
   Status CreateHashPartitions(
       int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6c02972/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index b6f4f12..3868e4f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -343,3 +343,16 @@ bigint,bigint,bigint,int,decimal,decimal,decimal,decimal,string,string,string,st
 1382,156162,6163,5,31.00,37762.96,0.07,0.03,'R','F','1993-10-26','1993-10-15','1993-11-09','TAKE
BACK RETURN','FOB','hely regular dependencies. f'
 1509,186349,3904,6,31.00,44495.54,0.04,0.03,'A','F','1993-07-14','1993-08-21','1993-08-06','COLLECT
COD','SHIP','ic deposits cajole carefully. quickly bold '
 ====
+---- QUERY
+# Test spilling aggregation when grouping by nondeterministic expression
+set buffer_pool_limit=5m;
+set num_nodes=1;
+select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
+from tpch_parquet.lineitem
+group by 1, 2, 3, 4, 5, random()
+limit 5
+---- RUNTIME_PROFILE
+row_regex: .*Query State: FINISHED.*
+row_regex: .*Query Status: OK.*
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====


Mime
View raw message