impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [4/4] incubator-impala git commit: IMPALA-4452: Always call AggFnEvaluator::Open() before AggFnEvaluator::Init()
Date Tue, 15 Nov 2016 02:00:29 GMT
IMPALA-4452: Always call AggFnEvaluator::Open() before AggFnEvaluator::Init()

As part of the fix for IMPALA-2379, the expression contexts of
aggregation function evaluators are expected to be opened before
their initFn() are called so \ constant arguments can be accessed
in initFn(). However, the legacy aggregation node wasn't updated
to follow this order for singleton result tuple (i.e. no group-by).

This patch fixes the problem by deferring the creation of the
singleton tuple to a point in AggregationNode::Open() after the
expression contexts of all aggregate function evaluators have
been opened. PartitionedAggregationNode() was already updated
to follow this order.

This patch also fixes a minor bug in which uninitialized entries
of agg_fn_ctxs_[] may be accessed in AggregationNode::Close()
if AggregationNode::Prepare() fails.

Change-Id: I2f261dee47821c517d8dbe1babf4112462d85807
Reviewed-on: http://gerrit.cloudera.org:8080/5049
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cac02d6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cac02d6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cac02d6b

Branch: refs/heads/master
Commit: cac02d6b767601ce308050f57520b65285566183
Parents: 10a4c5a
Author: Michael Ho <kwho@cloudera.com>
Authored: Thu Nov 10 00:15:06 2016 -0800
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Mon Nov 14 22:38:09 2016 +0000

----------------------------------------------------------------------
 be/src/exec/aggregation-node.cc | 35 +++++++++++++++++++----------------
 1 file changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cac02d6b/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 0607aa3..a5c7490 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -141,30 +141,23 @@ Status AggregationNode::Prepare(RuntimeState* state) {
       Expr::Prepare(build_expr_ctxs_, state, build_row_desc, expr_mem_tracker()));
   AddExprCtxsToFree(build_expr_ctxs_);
 
-  agg_fn_ctxs_.resize(aggregate_evaluators_.size());
   int j = probe_expr_ctxs_.size();
   for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
     SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
     SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
+    FunctionContext* agg_fn_ctx;
     RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(),
-        intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctxs_[i]));
-    state->obj_pool()->Add(agg_fn_ctxs_[i]);
+        intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctx));
+    agg_fn_ctxs_.push_back(agg_fn_ctx);
+    state->obj_pool()->Add(agg_fn_ctx);
   }
 
+  DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size());
   // TODO: how many buckets?
   hash_tbl_.reset(new OldHashTable(state, build_expr_ctxs_, probe_expr_ctxs_,
       vector<ExprContext*>(), 1, true,  vector<bool>(build_expr_ctxs_.size(),
true), id(),
       mem_tracker(), vector<RuntimeFilter*>(), true));
 
-  if (probe_expr_ctxs_.empty()) {
-    // create single intermediate tuple now; we need to output something
-    // even if our input is empty
-    singleton_intermediate_tuple_ = ConstructIntermediateTuple();
-    // Check for failures during AggFnEvaluator::Init().
-    RETURN_IF_ERROR(state->GetQueryStatus());
-    hash_tbl_->Insert(singleton_intermediate_tuple_);
-    output_iterator_ = hash_tbl_->Begin();
-  }
   if (!state->codegen_enabled()) {
     runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");
   }
@@ -202,6 +195,15 @@ Status AggregationNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i]));
   }
 
+  if (probe_expr_ctxs_.empty()) {
+    // Create single intermediate tuple. This must happen after
+    // opening the aggregate evaluators.
+    singleton_intermediate_tuple_ = ConstructIntermediateTuple();
+    // Check for failures during AggFnEvaluator::Init().
+    RETURN_IF_ERROR(state->GetQueryStatus());
+    hash_tbl_->Insert(singleton_intermediate_tuple_);
+  }
+
   RETURN_IF_ERROR(children_[0]->Open(state));
 
   RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
@@ -319,10 +321,11 @@ void AggregationNode::Close(RuntimeState* state) {
   if (hash_tbl_.get() != NULL) hash_tbl_->Close();
 
   agg_expr_ctxs_.clear();
-  DCHECK(agg_fn_ctxs_.empty() || aggregate_evaluators_.size() == agg_fn_ctxs_.size());
-  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
-    aggregate_evaluators_[i]->Close(state);
-    if (!agg_fn_ctxs_.empty()) agg_fn_ctxs_[i]->impl()->Close();
+  for (AggFnEvaluator* aggregate_evaluator : aggregate_evaluators_) {
+    aggregate_evaluator->Close(state);
+  }
+  for (FunctionContext* agg_fn_ctx : agg_fn_ctxs_) {
+    agg_fn_ctx->impl()->Close();
   }
   if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
 


Mime
View raw message