impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [13/14] incubator-impala git commit: IMPALA-4192: Disentangle Expr and ExprContext
Date Sun, 18 Jun 2017 18:36:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 083ab1a..2aabad1 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -19,7 +19,10 @@
 
 #include <gutil/strings/substitute.h>
 
+#include "exprs/agg-fn.h"
 #include "exprs/agg-fn-evaluator.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-tracker.h"
@@ -44,25 +47,21 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
         descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)),
     result_tuple_desc_(
         descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)),
-    buffered_tuple_desc_(NULL),
-    partition_by_eq_expr_ctx_(NULL),
-    order_by_eq_expr_ctx_(NULL),
     rows_start_offset_(0),
     rows_end_offset_(0),
     has_first_val_null_offset_(false),
     first_val_null_offset_(0),
-    client_(NULL),
-    child_tuple_cmp_row_(NULL),
+    client_(nullptr),
+    child_tuple_cmp_row_(nullptr),
     last_result_idx_(-1),
     prev_pool_last_result_idx_(-1),
     prev_pool_last_window_idx_(-1),
-    curr_tuple_(NULL),
-    dummy_result_tuple_(NULL),
+    curr_tuple_(nullptr),
+    dummy_result_tuple_(nullptr),
     curr_partition_idx_(-1),
-    prev_input_row_(NULL),
-    input_stream_(NULL),
+    input_stream_(nullptr),
     input_eos_(false),
-    evaluation_timer_(NULL) {
+    evaluation_timer_(nullptr) {
   if (tnode.analytic_node.__isset.buffered_tuple_id) {
     buffered_tuple_desc_ = descs.GetTupleDescriptor(
         tnode.analytic_node.buffered_tuple_id);
@@ -105,36 +104,48 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
 
 AnalyticEvalNode::~AnalyticEvalNode() {
   // Check that we didn't leak any memory.
-  DCHECK(input_stream_ == NULL || input_stream_->is_closed());
+  DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
 }
 
 Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  DCHECK_EQ(conjunct_ctxs_.size(), 0);
+  DCHECK_EQ(conjunct_evals_.size(), 0);
   const TAnalyticNode& analytic_node = tnode.analytic_node;
   bool has_lead_fn = false;
+
   for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
-    AggFnEvaluator* evaluator;
-    RETURN_IF_ERROR(AggFnEvaluator::Create(
-          pool_, analytic_node.analytic_functions[i], true, &evaluator));
-    evaluators_.push_back(evaluator);
+    AggFn* analytic_fn;
+    RETURN_IF_ERROR(AggFn::Create(analytic_node.analytic_functions[i],
+        child(0)->row_desc(), *(intermediate_tuple_desc_->slots()[i]),
+        *(result_tuple_desc_->slots()[i]), state, &analytic_fn));
+    analytic_fns_.push_back(analytic_fn);
+    DCHECK(!analytic_fn->is_merge());
     const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn;
-    is_lead_fn_.push_back("lead" == fn.name.function_name);
-    has_lead_fn = has_lead_fn || is_lead_fn_.back();
+    const bool is_lead_fn = fn.name.function_name == "lead";
+    is_lead_fn_.push_back(is_lead_fn);
+    has_lead_fn |= is_lead_fn;
   }
   DCHECK(!has_lead_fn || !window_.__isset.window_start);
   DCHECK(fn_scope_ != PARTITION || analytic_node.order_by_exprs.empty());
   DCHECK(window_.__isset.window_end || !window_.__isset.window_start)
       << "UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING.";
-  if (analytic_node.__isset.partition_by_eq) {
-    DCHECK(analytic_node.__isset.buffered_tuple_id);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.partition_by_eq,
-          &partition_by_eq_expr_ctx_));
-  }
-  if (analytic_node.__isset.order_by_eq) {
+
+  if (analytic_node.__isset.partition_by_eq || analytic_node.__isset.order_by_eq) {
     DCHECK(analytic_node.__isset.buffered_tuple_id);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_eq,
-          &order_by_eq_expr_ctx_));
+    DCHECK(buffered_tuple_desc_ != nullptr);
+    vector<TTupleId> tuple_ids;
+    tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id());
+    tuple_ids.push_back(buffered_tuple_desc_->id());
+    RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
+
+    if (analytic_node.__isset.partition_by_eq) {
+      RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.partition_by_eq, cmp_row_desc,
+          state, &partition_by_eq_expr_));
+    }
+    if (analytic_node.__isset.order_by_eq) {
+      RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.order_by_eq, cmp_row_desc,
+          state, &order_by_eq_expr_));
+    }
   }
   return Status::OK();
 }
@@ -146,35 +157,23 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
-  fn_pool_.reset(new MemPool(mem_tracker()));
+  fn_pool_.reset(new MemPool(expr_mem_tracker()));
   evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
 
-  DCHECK_EQ(result_tuple_desc_->slots().size(), evaluators_.size());
-  for (int i = 0; i < evaluators_.size(); ++i) {
-    impala_udf::FunctionContext* ctx;
-    RETURN_IF_ERROR(evaluators_[i]->Prepare(state, child(0)->row_desc(),
-        intermediate_tuple_desc_->slots()[i], result_tuple_desc_->slots()[i],
-        fn_pool_.get(), &ctx));
-    fn_ctxs_.push_back(ctx);
-    state->obj_pool()->Add(ctx);
+  DCHECK_EQ(result_tuple_desc_->slots().size(), analytic_fns_.size());
+  RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, fn_pool_.get(),
+      &analytic_fn_evals_));
+
+  if (partition_by_eq_expr_ != nullptr) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*partition_by_eq_expr_, state, pool_,
+        fn_pool_.get(), &partition_by_eq_expr_eval_));
+    AddEvaluatorToFree(partition_by_eq_expr_eval_);
   }
 
-  if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) {
-    DCHECK(buffered_tuple_desc_ != NULL);
-    vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id());
-    tuple_ids.push_back(buffered_tuple_desc_->id());
-    RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
-    if (partition_by_eq_expr_ctx_ != NULL) {
-      RETURN_IF_ERROR(
-          partition_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker()));
-      AddExprCtxToFree(partition_by_eq_expr_ctx_);
-    }
-    if (order_by_eq_expr_ctx_ != NULL) {
-      RETURN_IF_ERROR(
-          order_by_eq_expr_ctx_->Prepare(state, cmp_row_desc, expr_mem_tracker()));
-      AddExprCtxToFree(order_by_eq_expr_ctx_);
-    }
+  if (order_by_eq_expr_ != nullptr) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, pool_,
+        fn_pool_.get(), &order_by_eq_expr_eval_));
+    AddEvaluatorToFree(order_by_eq_expr_eval_);
   }
 
   // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe.
@@ -191,8 +190,8 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
-  DCHECK(client_ != NULL);
-  DCHECK(input_stream_ == NULL);
+  DCHECK(client_ != nullptr);
+  DCHECK(input_stream_ == nullptr);
   input_stream_.reset(
       new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_,
           false /* use_initial_small_buffers */, true /* read_write */));
@@ -208,30 +207,29 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
     return state->block_mgr()->MemLimitTooLowError(client_, id());
   }
 
-  DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
-  for (int i = 0; i < evaluators_.size(); ++i) {
-    RETURN_IF_ERROR(evaluators_[i]->Open(state, fn_ctxs_[i]));
-    DCHECK(!evaluators_[i]->is_merge());
-
+  for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
+    RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state));
+    FunctionContext* agg_fn_ctx = analytic_fn_evals_[i]->agg_fn_ctx();
     if (!has_first_val_null_offset_ &&
-        "first_value_rewrite" == evaluators_[i]->fn_name() &&
-        fn_ctxs_[i]->GetNumArgs() == 2) {
+        "first_value_rewrite" == analytic_fns_[i]->fn_name() &&
+        agg_fn_ctx->GetNumArgs() == 2) {
       DCHECK(!has_first_val_null_offset_);
       first_val_null_offset_ =
-        reinterpret_cast<BigIntVal*>(fn_ctxs_[i]->GetConstantArg(1))->val;
+          reinterpret_cast<BigIntVal*>(agg_fn_ctx->GetConstantArg(1))->val;
       VLOG_FILE << id() << " FIRST_VAL rewrite null offset: " << first_val_null_offset_;
       has_first_val_null_offset_ = true;
     }
   }
 
-  if (partition_by_eq_expr_ctx_ != NULL) {
-    RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Open(state));
+  if (partition_by_eq_expr_eval_ != nullptr) {
+    RETURN_IF_ERROR(partition_by_eq_expr_eval_->Open(state));
   }
-  if (order_by_eq_expr_ctx_ != NULL) {
-    RETURN_IF_ERROR(order_by_eq_expr_ctx_->Open(state));
+
+  if (order_by_eq_expr_eval_ != nullptr) {
+    RETURN_IF_ERROR(order_by_eq_expr_eval_->Open(state));
   }
 
-  if (buffered_tuple_desc_ != NULL) {
+  if (buffered_tuple_desc_ != nullptr) {
     // The backing mem_pool_ is freed in Reset(), so we need to allocate
     // a new row every time we Open().
     child_tuple_cmp_row_ = reinterpret_cast<TupleRow*>(
@@ -240,7 +238,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
 
   // An intermediate tuple is only allocated once and is reused.
   curr_tuple_ = Tuple::Create(intermediate_tuple_desc_->byte_size(), mem_pool_.get());
-  AggFnEvaluator::Init(evaluators_, fn_ctxs_, curr_tuple_);
+  AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_);
   // Allocate dummy_result_tuple_ even if AggFnEvaluator::Init() may have failed
   // as it is needed in Close().
   // TODO: move this to Prepare()
@@ -351,7 +349,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
   if (fn_scope_ != ROWS || !window_.__isset.window_start ||
       stream_idx - rows_start_offset_ >= curr_partition_idx_) {
     VLOG_ROW << id() << " Update idx=" << stream_idx;
-    AggFnEvaluator::Add(evaluators_, fn_ctxs_, row, curr_tuple_);
+    AggFnEvaluator::Add(analytic_fn_evals_, row, curr_tuple_);
     if (window_.__isset.window_start) {
       VLOG_ROW << id() << " Adding tuple to window at idx=" << stream_idx;
       Tuple* tuple = row->GetTuple(0)->DeepCopy(
@@ -384,20 +382,20 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
 
 Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
-  DCHECK(curr_tuple_ != NULL);
+  DCHECK(curr_tuple_ != nullptr);
   MemPool* cur_tuple_pool = curr_tuple_pool_.get();
   Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool);
 
-  AggFnEvaluator::GetValue(evaluators_, fn_ctxs_, curr_tuple_, result_tuple);
+  AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple);
   // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
   for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
     if (!slot_desc->type().IsVarLenStringType()) continue;
     StringValue* sv = reinterpret_cast<StringValue*>(
         result_tuple->GetSlot(slot_desc->tuple_offset()));
-    if (sv == NULL || sv->len == 0) continue;
+    if (sv == nullptr || sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len));
-    if (UNLIKELY(new_ptr == NULL)) {
-      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(NULL,
+    if (UNLIKELY(new_ptr == nullptr)) {
+      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr,
           "Failed to allocate memory for analytic function's result.", sv->len);
     }
     memcpy(new_ptr, sv->ptr, sv->len);
@@ -419,7 +417,7 @@ inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
   VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition
            << " idx=" << stream_idx;
   if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE &&
-      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_ctx_)))) {
+      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_eval_)))) {
     RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
   return Status::OK();
@@ -450,7 +448,7 @@ inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
   DCHECK_EQ(remove_idx + max<int64_t>(rows_start_offset_, 0),
       window_tuples_.front().first) << DebugStateString(true);
   TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second);
-  AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_);
+  AggFnEvaluator::Remove(analytic_fn_evals_, remove_row, curr_tuple_);
   window_tuples_.pop_front();
 }
 
@@ -469,11 +467,11 @@ inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
   // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any remaining
   // results need the default value (set by Init()). If this is the case, the start bound
   // is UNBOUNDED PRECEDING (DCHECK in Init()).
-  for (int i = 0; i < evaluators_.size(); ++i) {
+  for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
     if (is_lead_fn_[i]) {
       // Needs to call Finalize() to release resources.
-      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
-      evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+      analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_);
+      analytic_fn_evals_[i]->Init(curr_tuple_);
     }
   }
 
@@ -492,7 +490,7 @@ inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
       VLOG_ROW << id() << " Remove window_row_idx=" << window_tuples_.front().first
                << " for result row at idx=" << next_result_idx;
       TupleRow* remove_row = reinterpret_cast<TupleRow*>(&window_tuples_.front().second);
-      AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_);
+      AggFnEvaluator::Remove(analytic_fn_evals_, remove_row, curr_tuple_);
       window_tuples_.pop_front();
     }
     RETURN_IF_ERROR(AddResultTuple(last_result_idx_ + 1));
@@ -522,7 +520,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
   // >= curr_partition_idx_ is the last result tuple of the previous partition.  Adding
   // the last result tuple to result_tuples_ with a stream index curr_partition_idx_ - 1
   // ensures that all rows in the previous partition have corresponding analytic results.
-  Tuple* prev_partition_last_result_tuple = NULL;
+  Tuple* prev_partition_last_result_tuple = nullptr;
   while (!result_tuples_.empty() && result_tuples_.back().first >= curr_partition_idx_) {
     DCHECK(fn_scope_ == ROWS && window_.__isset.window_end &&
         window_.window_end.type == TAnalyticWindowBoundaryType::PRECEDING);
@@ -531,7 +529,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
     prev_partition_last_result_tuple = result_tuples_.back().second;
     result_tuples_.pop_back();
   }
-  if (prev_partition_last_result_tuple != NULL) {
+  if (prev_partition_last_result_tuple != nullptr) {
     if (result_tuples_.empty() ||
         result_tuples_.back().first < curr_partition_idx_ - 1) {
       // prev_partition_last_result_tuple was the last result tuple in the partition, add
@@ -558,14 +556,14 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
   VLOG_ROW << id() << " Reset curr_tuple";
   // Call finalize to release resources; result is not needed but the dst tuple must be
   // a tuple described by result_tuple_desc_.
-  AggFnEvaluator::Finalize(evaluators_, fn_ctxs_, curr_tuple_, dummy_result_tuple_);
+  AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_);
   // Re-initialize curr_tuple_.
   curr_tuple_->Init(intermediate_tuple_desc_->byte_size());
-  AggFnEvaluator::Init(evaluators_, fn_ctxs_, curr_tuple_);
+  AggFnEvaluator::Init(analytic_fn_evals_, curr_tuple_);
   // Check for errors in AggFnEvaluator::Init().
   RETURN_IF_ERROR(state->GetQueryStatus());
 
-  // Add a result tuple containing values set by Init() (e.g. NULL for sum(), 0 for
+  // Add a result tuple containing values set by Init() (e.g. nullptr for sum(), 0 for
   // count()) for output rows that have no input rows in the window. We need to add this
   // result tuple before any input rows are consumed and the evaluators are updated.
   if (fn_scope_ == ROWS && window_.__isset.window_end &&
@@ -594,9 +592,9 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
   return Status::OK();
 }
 
-inline bool AnalyticEvalNode::PrevRowCompare(ExprContext* pred_ctx) {
-  DCHECK(pred_ctx != NULL);
-  BooleanVal result = pred_ctx->GetBooleanVal(child_tuple_cmp_row_);
+inline bool AnalyticEvalNode::PrevRowCompare(ScalarExprEvaluator* pred_eval) {
+  DCHECK(pred_eval != nullptr);
+  BooleanVal result = pred_eval->GetBooleanVal(child_tuple_cmp_row_);
   DCHECK(!result.is_null);
   return result.val;
 }
@@ -650,11 +648,12 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
 
   for (; batch_idx < curr_child_batch_->num_rows(); ++batch_idx, ++stream_idx) {
     TupleRow* row = curr_child_batch_->GetRow(batch_idx);
-    if (partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL) {
+    if (partition_by_eq_expr_eval_ != nullptr ||
+        order_by_eq_expr_eval_ != nullptr) {
       // Only set the tuples in child_tuple_cmp_row_ if there are partition exprs or
       // order by exprs that require comparing the current and previous rows. If there
-      // aren't partition or order by exprs (i.e. empty OVER() clause), there was no sort
-      // and there could be nullable tuples (whereas the sort node does not produce
+      // aren't partition or order by exprs (i.e. empty OVER() clause), there was no
+      // sort and there could be nullable tuples (whereas the sort node does not produce
       // them), see IMPALA-1562.
       child_tuple_cmp_row_->SetTuple(0, prev_input_row_->GetTuple(0));
       child_tuple_cmp_row_->SetTuple(1, row->GetTuple(0));
@@ -663,22 +662,23 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
 
     // Every row is compared against the previous row to determine if (a) the row
     // starts a new partition or (b) the row does not share the same values for the
-    // ordering exprs. When either of these occurs, the evaluators_ are finalized and
-    // the result tuple is added to result_tuples_ so that it may be added to output
-    // rows in GetNextOutputBatch(). When a new partition is found (a), a new, empty
-    // result tuple is created and initialized over the evaluators_. If the row has
-    // different values for the ordering exprs (b), then a new tuple is created but
-    // copied from curr_tuple_ because the original is used for one or more previous
-    // row(s) but the incremental state still applies to the current row.
+    // ordering exprs. When either of these occurs, the analytic_fn_evals_ are
+    // finalized and the result tuple is added to result_tuples_ so that it may be
+    // added to output rows in GetNextOutputBatch(). When a new partition is found
+    // (a), a new, empty result tuple is created and initialized over the
+    // analytic_fn_evals_. If the row has different values for the ordering
+    // exprs (b), then a new tuple is created but copied from curr_tuple_ because the
+    // original is used for one or more previous row(s) but the incremental state still
+    // applies to the current row.
     bool next_partition = false;
-    if (partition_by_eq_expr_ctx_ != NULL) {
-      // partition_by_eq_expr_ctx_ checks equality over the predicate exprs
-      next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_);
+    if (partition_by_eq_expr_eval_ != nullptr) {
+      // partition_by_eq_expr_eval_ checks equality over the predicate exprs
+      next_partition = !PrevRowCompare(partition_by_eq_expr_eval_);
     }
     RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, row));
     if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx));
 
-    // The evaluators_ are updated with the current row.
+    // The analytic_fn_evals_ are updated with the current row.
     RETURN_IF_ERROR(AddRow(stream_idx, row));
 
     RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row));
@@ -709,8 +709,8 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
   return Status::OK();
 }
 
-Status AnalyticEvalNode::GetNextOutputBatch(RuntimeState* state, RowBatch* output_batch,
-    bool* eos) {
+Status AnalyticEvalNode::GetNextOutputBatch(
+    RuntimeState* state, RowBatch* output_batch, bool* eos) {
   SCOPED_TIMER(evaluation_timer_);
   VLOG_FILE << id() << " GetNextOutputBatch: " << DebugStateString()
             << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
@@ -793,7 +793,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
 
   bool output_eos = false;
   RETURN_IF_ERROR(GetNextOutputBatch(state, row_batch, &output_eos));
-  if (curr_child_batch_.get() == NULL && output_eos) {
+  if (curr_child_batch_.get() == nullptr && output_eos) {
     // Transfer the ownership of all row-backing resources on eos for simplicity.
     // TODO: This transfer is simple and correct, but not necessarily efficient. We
     // should optimize the use/transfer of memory to better amortize allocations
@@ -837,20 +837,20 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   DCHECK_EQ(prev_tuple_pool_->total_allocated_bytes(), 0);
   // Call Finalize() to clear evaluator allocations, but do not Close() them,
   // so we can keep evaluating them.
-  if (curr_tuple_ != NULL) {
-    for (int i = 0; i < evaluators_.size(); ++i) {
-      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
+  if (curr_tuple_ != nullptr) {
+    for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
+      analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_);
     }
   }
   mem_pool_->Clear();
   // The following members will be re-created in Open().
   // input_stream_ should have been closed by last GetNext() call.
-  DCHECK(input_stream_ == NULL || input_stream_->is_closed());
+  DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
   input_stream_.reset();
-  curr_tuple_ = NULL;
-  child_tuple_cmp_row_ = NULL;
-  dummy_result_tuple_ = NULL;
-  prev_input_row_ = NULL;
+  curr_tuple_ = nullptr;
+  child_tuple_cmp_row_ = nullptr;
+  dummy_result_tuple_ = nullptr;
+  prev_input_row_ = nullptr;
   prev_child_batch_.reset();
   curr_child_batch_.reset();
   return ExecNode::Reset(state);
@@ -858,37 +858,43 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
 
 void AnalyticEvalNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (client_ != NULL) state->block_mgr()->ClearReservations(client_);
+  if (client_ != nullptr) state->block_mgr()->ClearReservations(client_);
   // We may need to clean up input_stream_ if an error occurred at some point.
-  if (input_stream_ != NULL) {
-    input_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  if (input_stream_ != nullptr) {
+    input_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
 
-  // Close all evaluators and fn ctxs. If an error occurred in Init or Prepare there may
-  // be fewer ctxs than evaluators. We also need to Finalize if curr_tuple_ was created
-  // in Open.
-  DCHECK_LE(fn_ctxs_.size(), evaluators_.size());
-  DCHECK(curr_tuple_ == NULL || fn_ctxs_.size() == evaluators_.size());
-  for (int i = 0; i < evaluators_.size(); ++i) {
+  // Close all evaluators. If an error occurred in Init or Prepare there may be fewer
+  // be fewer evaluators than analytic fns. We also need to Finalize if curr_tuple_ was
+  // created in Open().
+  DCHECK_LE(analytic_fn_evals_.size(), analytic_fns_.size());
+  DCHECK(curr_tuple_ == nullptr ||
+      analytic_fn_evals_.size() == analytic_fns_.size());
+  for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
     // Need to make sure finalize is called in case there is any state to clean up.
-    if (curr_tuple_ != NULL) {
-      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
+    if (curr_tuple_ != nullptr)  {
+      analytic_fn_evals_[i]->Finalize(curr_tuple_, dummy_result_tuple_);
     }
-    evaluators_[i]->Close(state);
-  }
-  for (int i = 0; i < fn_ctxs_.size(); ++i) {
-    fn_ctxs_[i]->impl()->FreeLocalAllocations();
-    fn_ctxs_[i]->impl()->Close();
+    analytic_fn_evals_[i]->Close(state);
   }
+  AggFn::Close(analytic_fns_);
 
-  if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state);
-  if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state);
-  if (prev_child_batch_.get() != NULL) prev_child_batch_.reset();
-  if (curr_child_batch_.get() != NULL) curr_child_batch_.reset();
-  if (curr_tuple_pool_.get() != NULL) curr_tuple_pool_->FreeAll();
-  if (prev_tuple_pool_.get() != NULL) prev_tuple_pool_->FreeAll();
-  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
-  if (fn_pool_.get() != NULL) fn_pool_->FreeAll();
+  if (partition_by_eq_expr_ != nullptr) {
+    if (partition_by_eq_expr_eval_ != nullptr) {
+      partition_by_eq_expr_eval_->Close(state);
+    }
+    partition_by_eq_expr_->Close();
+  }
+  if (order_by_eq_expr_ != nullptr) {
+    if (order_by_eq_expr_eval_ != nullptr) order_by_eq_expr_eval_->Close(state);
+    order_by_eq_expr_->Close();
+  }
+  if (prev_child_batch_.get() != nullptr) prev_child_batch_.reset();
+  if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset();
+  if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
+  if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
+  if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
+  if (fn_pool_.get() != nullptr) fn_pool_->FreeAll();
   ExecNode::Close(state);
 }
 
@@ -896,22 +902,19 @@ void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) con
   *out << string(indentation_level * 2, ' ');
   *out << "AnalyticEvalNode("
        << " window=" << DebugWindowString();
-  if (partition_by_eq_expr_ctx_ != NULL) {
-    *out << " partition_exprs=" << partition_by_eq_expr_ctx_->root()->DebugString();
+  if (partition_by_eq_expr_ != nullptr) {
+    *out << " partition_exprs=" << partition_by_eq_expr_->DebugString();
   }
-  if (order_by_eq_expr_ctx_ != NULL) {
-    *out << " order_by_exprs=" << order_by_eq_expr_ctx_->root()->DebugString();
+  if (order_by_eq_expr_ != nullptr) {
+    *out << " order_by_exprs=" << order_by_eq_expr_->DebugString();
   }
-  *out << AggFnEvaluator::DebugString(evaluators_);
+  *out << AggFn::DebugString(analytic_fns_);
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
 
 Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) {
-  for (int i = 0; i < evaluators_.size(); ++i) {
-    ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs());
-  }
-  ExprContext::FreeLocalAllocations(fn_ctxs_);
+  AggFnEvaluator::FreeLocalAllocations(analytic_fn_evals_);
   return ExecNode::QueryMaintenance(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 0203175..89c5cf3 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -19,15 +19,16 @@
 #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
 
 #include "exec/exec-node.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/tuple.h"
 
 namespace impala {
 
+class AggFn;
 class AggFnEvaluator;
+class ScalarExpr;
+class ScalarExprEvaluator;
 
 /// Evaluates analytic functions with a single pass over input rows. It is assumed
 /// that the input has already been sorted on all of the partition exprs and then the
@@ -71,7 +72,7 @@ class AnalyticEvalNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  /// Frees local allocations from evaluators_
+  /// Frees local allocations from analytic_fn_evals_
   virtual Status QueryMaintenance(RuntimeState* state);
 
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
@@ -162,7 +163,7 @@ class AnalyticEvalNode : public ExecNode {
   Status InitNextPartition(RuntimeState* state, int64_t stream_idx);
 
   /// Produces a result tuple with analytic function results by calling GetValue() or
-  /// Finalize() for 'curr_tuple_' on the 'evaluators_'. The result tuple is stored in
+  /// Finalize() for 'curr_tuple_' on the 'evaluators'. The result tuple is stored in
   /// 'result_tuples_' with the index into 'input_stream_' specified by 'stream_idx'.
   /// Returns an error when memory limit is exceeded.
   Status AddResultTuple(int64_t stream_idx);
@@ -175,9 +176,10 @@ class AnalyticEvalNode : public ExecNode {
   /// This is necessary to produce the default value (set by Init()).
   void ResetLeadFnSlots();
 
-  /// Evaluates the predicate pred_ctx over child_tuple_cmp_row_, which is a TupleRow*
-  /// containing the previous row and the current row set during ProcessChildBatch().
-  bool PrevRowCompare(ExprContext* pred_ctx);
+  /// Evaluates the predicate pred_eval over child_tuple_cmp_row_, which is
+  /// a TupleRow* containing the previous row and the current row set during
+  /// ProcessChildBatch().
+  bool PrevRowCompare(ScalarExprEvaluator* pred_eval);
 
   /// Debug string containing current state. If 'detailed', per-row state is included.
   std::string DebugStateString(bool detailed) const;
@@ -193,23 +195,25 @@ class AnalyticEvalNode : public ExecNode {
   const TAnalyticWindow window_;
 
   /// Tuple descriptor for storing intermediate values of analytic fn evaluation.
-  const TupleDescriptor* intermediate_tuple_desc_;
+  const TupleDescriptor* intermediate_tuple_desc_ = nullptr;
 
   /// Tuple descriptor for storing results of analytic fn evaluation.
-  const TupleDescriptor* result_tuple_desc_;
+  const TupleDescriptor* result_tuple_desc_ = nullptr;
 
   /// Tuple descriptor of the buffered tuple (identical to the input child tuple, which is
   /// assumed to come from a single SortNode). NULL if both partition_exprs and
   /// order_by_exprs are empty.
-  TupleDescriptor* buffered_tuple_desc_;
+  TupleDescriptor* buffered_tuple_desc_ = nullptr;
 
-  /// Expr context for a predicate that checks if child tuple '<' buffered tuple for
-  /// partitioning exprs.
-  ExprContext* partition_by_eq_expr_ctx_;
+  /// A predicate that checks if child tuple '<' buffered tuple for partitioning exprs
+  /// and its evaluator.
+  ScalarExpr* partition_by_eq_expr_ = nullptr;
+  ScalarExprEvaluator* partition_by_eq_expr_eval_ = nullptr;
 
-  /// Expr context for a predicate that checks if child tuple '<' buffered tuple for
-  /// order by exprs.
-  ExprContext* order_by_eq_expr_ctx_;
+  /// A predicate that checks if child tuple '<' buffered tuple for order by exprs and
+  /// its evaluator.
+  ScalarExpr* order_by_eq_expr_ = nullptr;
+  ScalarExprEvaluator* order_by_eq_expr_eval_ = nullptr;
 
   /// The scope over which analytic functions are evaluated.
   /// TODO: Consider adding additional state to capture whether different kinds of window
@@ -222,8 +226,10 @@ class AnalyticEvalNode : public ExecNode {
   int64_t rows_start_offset_;
   int64_t rows_end_offset_;
 
-  /// Analytic function evaluators.
-  std::vector<AggFnEvaluator*> evaluators_;
+  /// Analytic functions and their evaluators. 'analytic_fns_' live in the query-state's
+  /// objpool while the evaluators live in the exec node's objpool.
+  std::vector<AggFn*> analytic_fns_;
+  std::vector<AggFnEvaluator*> analytic_fn_evals_;
 
   /// Indicates if each evaluator is the lead() fn. Used by ResetLeadFnSlots() to
   /// determine which slots need to be reset.
@@ -234,10 +240,6 @@ class AnalyticEvalNode : public ExecNode {
   bool has_first_val_null_offset_;
   long first_val_null_offset_;
 
-  /// FunctionContext for each analytic function. String data returned by the analytic
-  /// functions is allocated via these contexts.
-  std::vector<impala_udf::FunctionContext*> fn_ctxs_;
-
   /// Mem pool backing allocations from fn_ctxs_. This pool must not be Reset() because
   /// the memory is managed by the FreePools of the function contexts which do their own
   /// bookkeeping using a pointer-based structure stored in the memory blocks themselves.
@@ -253,15 +255,15 @@ class AnalyticEvalNode : public ExecNode {
   boost::scoped_ptr<MemPool> prev_tuple_pool_;
 
   /// Block manager client used by input_stream_. Not owned.
-  BufferedBlockMgr::Client* client_;
+  BufferedBlockMgr::Client* client_ = nullptr;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
   /// TupleRow* composed of the first child tuple and the buffered tuple, used by
-  /// partition_by_eq_expr_ctx_ and order_by_eq_expr_ctx_. Set in Open() if
+  /// partition_by_eq_expr_eval_ and order_by_eq_expr_eval_. Set in Open() if
   /// buffered_tuple_desc_ is not NULL, allocated from mem_pool_.
-  TupleRow* child_tuple_cmp_row_;
+  TupleRow* child_tuple_cmp_row_ = nullptr;
 
   /// Queue of tuples which are ready to be set in output rows, with the index into
   /// the input_stream_ stream of the last TupleRow that gets the Tuple, i.e. this is a
@@ -300,23 +302,23 @@ class AnalyticEvalNode : public ExecNode {
   int64_t prev_pool_last_window_idx_;
 
   /// The tuple described by intermediate_tuple_desc_ storing intermediate state for the
-  /// evaluators_. When enough input rows have been consumed to produce the analytic
-  /// function results, a result tuple (described by result_tuple_desc_) is created and
-  /// the agg fn results are written to that tuple by calling Finalize()/GetValue()
-  /// on the evaluators with curr_tuple_ as the source tuple.
-  Tuple* curr_tuple_;
+  /// analytic_eval_fns_. When enough input rows have been consumed to produce the
+  /// analytic function results, a result tuple (described by result_tuple_desc_) is
+  /// created and the agg fn results are written to that tuple by calling Finalize()/
+  /// GetValue() on the evaluators with curr_tuple_ as the source tuple.
+  Tuple* curr_tuple_ = nullptr;
 
   /// A tuple described by result_tuple_desc_ used when calling Finalize() on the
-  /// evaluators_ to release resources between partitions; the value is never used.
+  /// analytic_fn_evals_ to release resources between partitions; the value is never used.
   /// TODO: Remove when agg fns implement a separate Close() method to release resources.
-  Tuple* dummy_result_tuple_;
+  Tuple* dummy_result_tuple_ = nullptr;
 
   /// Index of the row in input_stream_ at which the current partition started.
   int64_t curr_partition_idx_;
 
   /// Previous input row used to compare partition boundaries and to determine when the
   /// order-by expressions change.
-  TupleRow* prev_input_row_;
+  TupleRow* prev_input_row_ = nullptr;
 
   /// Current and previous input row batches from the child. RowBatches are allocated
   /// once and reused. Previous input row batch owns prev_input_row_ between calls to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 6e73f77..486c5cc 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -20,7 +20,7 @@
 #include <sstream>
 
 #include "exec/data-sink.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -55,7 +55,7 @@ Status BlockingJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK((join_op_ != TJoinOp::LEFT_SEMI_JOIN && join_op_ != TJoinOp::LEFT_ANTI_JOIN &&
       join_op_ != TJoinOp::RIGHT_SEMI_JOIN && join_op_ != TJoinOp::RIGHT_ANTI_JOIN &&
-      join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || conjunct_ctxs_.size() == 0);
+      join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) || conjuncts_.size() == 0);
   runtime_profile_->AddLocalTimeCounter(
       bind<int64_t>(&BlockingJoinNode::LocalTimeCounterFn,
       runtime_profile_->total_time_counter(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index c184857..a14d979 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -30,8 +30,6 @@
 
 namespace impala {
 
-class MemPool;
-class MemTracker;
 class RowBatch;
 class TupleRow;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index dad6247..437343e 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -27,7 +27,7 @@
 #include "exec/kudu-table-sink.h"
 #include "exec/kudu-util.h"
 #include "exec/plan-root-sink.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
@@ -48,64 +48,64 @@ DataSink::~DataSink() {
   DCHECK(closed_);
 }
 
-Status DataSink::Create(ObjectPool* pool,
-    const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    const RowDescriptor& row_desc, DataSink** sink) {
+Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, const RowDescriptor& row_desc,
+    RuntimeState* state, DataSink** sink) {
   const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
-  const vector<TExpr>& output_exprs = fragment_ctx.fragment.output_exprs;
+  const vector<TExpr>& thrift_output_exprs = fragment_ctx.fragment.output_exprs;
+  ObjectPool* pool = state->obj_pool();
   switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK:
-      if (!thrift_sink.__isset.stream_sink) {
-        return Status("Missing data stream sink.");
-      }
+      if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
 
       // TODO: figure out good buffer size based on size of output row
-      *sink = pool->Add(
-          new DataStreamSender(pool,
-            fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
-            fragment_ctx.destinations, 16 * 1024));
+      *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
+          thrift_sink.stream_sink, fragment_ctx.destinations, 16 * 1024));
       break;
-
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
       switch (thrift_sink.table_sink.type) {
         case TTableSinkType::HDFS:
-          *sink = pool->Add(new HdfsTableSink(row_desc, output_exprs, thrift_sink));
+          *sink = pool->Add(new HdfsTableSink(row_desc, thrift_sink));
           break;
         case TTableSinkType::HBASE:
-          *sink = pool->Add(new HBaseTableSink(row_desc, output_exprs, thrift_sink));
+          *sink = pool->Add(new HBaseTableSink(row_desc, thrift_sink));
           break;
         case TTableSinkType::KUDU:
           RETURN_IF_ERROR(CheckKuduAvailability());
-          *sink = pool->Add(new KuduTableSink(row_desc, output_exprs, thrift_sink));
+          *sink = pool->Add(new KuduTableSink(row_desc, thrift_sink));
           break;
         default:
           stringstream error_msg;
-          const char* str = "Unknown table sink";
           map<int, const char*>::const_iterator i =
               _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
-          if (i != _TTableSinkType_VALUES_TO_NAMES.end()) str = i->second;
+          const char* str = i != _TTableSinkType_VALUES_TO_NAMES.end() ?
+              i->second : "Unknown table sink";
           error_msg << str << " not implemented.";
           return Status(error_msg.str());
       }
-
       break;
     case TDataSinkType::PLAN_ROOT_SINK:
-      *sink = pool->Add(new PlanRootSink(row_desc, output_exprs, thrift_sink));
+      *sink = pool->Add(new PlanRootSink(row_desc));
       break;
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =
           _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
-      const char* str = "Unknown data sink type ";
-      if (i != _TDataSinkType_VALUES_TO_NAMES.end()) str = i->second;
+      const char* str = i != _TDataSinkType_VALUES_TO_NAMES.end() ?
+          i->second :  "Unknown data sink type ";
       error_msg << str << " not implemented.";
       return Status(error_msg.str());
   }
+  RETURN_IF_ERROR((*sink)->Init(thrift_output_exprs, thrift_sink, state));
   return Status::OK();
 }
 
+Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
+    const TDataSink& tsink, RuntimeState* state) {
+  return ScalarExpr::Create(thrift_output_exprs, row_desc_, state, &output_exprs_);
+}
+
 void DataSink::MergeDmlStats(const TInsertStats& src_stats,
     TInsertStats* dst_stats) {
   dst_stats->bytes_written += src_stats.bytes_written;
@@ -114,7 +114,6 @@ void DataSink::MergeDmlStats(const TInsertStats& src_stats,
     if (!dst_stats->kudu_stats.__isset.num_row_errors) {
       dst_stats->kudu_stats.__set_num_row_errors(0);
     }
-
     dst_stats->kudu_stats.__set_num_row_errors(
         dst_stats->kudu_stats.num_row_errors + src_stats.kudu_stats.num_row_errors);
   }
@@ -177,11 +176,22 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(
       new MemTracker(-1, Substitute("$0 Exprs", name), mem_tracker_.get(), false));
+  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, state->obj_pool(),
+      expr_mem_pool(), &output_expr_evals_));
   return Status::OK();
 }
 
+Status DataSink::Open(RuntimeState* state) {
+  DCHECK_EQ(output_exprs_.size(), output_expr_evals_.size());
+  return ScalarExprEvaluator::Open(output_expr_evals_, state);
+}
+
 void DataSink::Close(RuntimeState* state) {
   if (closed_) return;
+  ScalarExprEvaluator::Close(output_expr_evals_, state);
+  ScalarExpr::Close(output_exprs_);
+  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
   if (expr_mem_tracker_ != NULL) {
     expr_mem_tracker_->UnregisterFromParent();
     expr_mem_tracker_.reset();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index f20c40b..cfd06bb 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -29,10 +29,14 @@
 
 namespace impala {
 
+class MemPool;
 class ObjectPool;
 class RowBatch;
 class RuntimeProfile;
+class RuntimeState;
 class RowDescriptor;
+class ScalarExpr;
+class ScalarExprEvaluator;
 class TDataSink;
 class TPlanExecRequest;
 class TPlanExecParams;
@@ -57,12 +61,13 @@ class DataSink {
   virtual std::string GetName() = 0;
 
   /// Setup. Call before Send(), Open(), or Close() during the prepare phase of the query
-  /// fragment. Creates a MemTracker (in obj_pool) for the sink that is a child of
-  /// 'parent_mem_tracker'. Subclasses must call DataSink::Prepare().
+  /// fragment. Creates a MemTracker for the sink that is a child of 'parent_mem_tracker'.
+  /// Also creates a MemTracker and MemPool for the output (and partitioning) expr and
+  /// initializes their evaluators. Subclasses must call DataSink::Prepare().
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
-  /// Call before Send() to open the sink.
-  virtual Status Open(RuntimeState* state) = 0;
+  /// Call before Send() to open the sink and initialize output expression evaluators.
+  virtual Status Open(RuntimeState* state);
 
   /// Send a row batch into this sink. Send() may modify 'batch' by acquiring its state.
   virtual Status Send(RuntimeState* state, RowBatch* batch) = 0;
@@ -79,10 +84,9 @@ class DataSink {
 
   /// Creates a new data sink, allocated in pool and returned through *sink, from
   /// thrift_sink.
-  static Status Create(ObjectPool* pool,
-    const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    const RowDescriptor& row_desc, DataSink** sink);
+  static Status Create(const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
+      const RowDescriptor& row_desc, RuntimeState* state, DataSink** sink);
 
   /// Merges one update to the DML stats for a partition. dst_stats will have the
   /// combined stats of src_stats and dst_stats after this method returns.
@@ -95,6 +99,10 @@ class DataSink {
 
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
+  MemPool* expr_mem_pool() const { return expr_mem_pool_.get(); }
+  const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
+    return output_expr_evals_;
+  }
 
  protected:
   /// Set to true after Close() has been called. Subclasses should check and set this in
@@ -112,7 +120,18 @@ class DataSink {
 
   /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in Prepare().
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
-};
 
+  /// MemPool for backing data structures in expressions and their evaluators.
+  boost::scoped_ptr<MemPool> expr_mem_pool_;
+
+  /// Output expressions to convert row batches onto output values.
+  /// Not used in some sub-classes.
+  std::vector<ScalarExpr*> output_exprs_;
+  std::vector<ScalarExprEvaluator*> output_expr_evals_;
+
+  /// Initialize the expressions in the data sink and return error status on failure.
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state);
+};
 } // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index c0a9287..01e0dbe 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -22,12 +22,13 @@
 
 #include "exec/parquet-common.h"
 #include "exec/read-write-util.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
 #include "runtime/tuple-row.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
@@ -147,7 +148,7 @@ Status DataSourceScanNode::GetNextInputBatch() {
   input_batch_.reset(new TGetNextResult());
   next_row_idx_ = 0;
   // Reset all the indexes into the column value arrays to 0
-  memset(&cols_next_val_idx_[0], 0, sizeof(int) * cols_next_val_idx_.size());
+  memset(cols_next_val_idx_.data(), 0, sizeof(int) * cols_next_val_idx_.size());
   TGetNextParams params;
   params.__set_scan_handle(scan_handle_);
   RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
@@ -320,8 +321,9 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
   RETURN_IF_ERROR(
       row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer));
   Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
-  ExprContext** ctxs = &conjunct_ctxs_[0];
-  int num_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* evals = conjunct_evals_.data();
+  int num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
 
   while (true) {
     {
@@ -333,7 +335,7 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
         TupleRow* tuple_row = row_batch->GetRow(row_idx);
         tuple_row->SetTuple(tuple_idx_, tuple);
 
-        if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
+        if (ExecNode::EvalConjuncts(evals, num_conjuncts, tuple_row)) {
           row_batch->CommitLastRow();
           tuple = reinterpret_cast<Tuple*>(
               reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 6b5e52b..4d32f80 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -59,7 +59,8 @@ Status ExchangeNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   if (!is_merging_) return Status::OK();
 
-  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.exchange_node.sort_info, pool_));
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.exchange_node.sort_info.ordering_exprs,
+      row_descriptor_, state, &ordering_exprs_));
   is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order;
   nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
   return Status::OK();
@@ -81,11 +82,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
       input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
       FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
   if (is_merging_) {
-    RETURN_IF_ERROR(sort_exec_exprs_.Prepare(
-        state, row_descriptor_, row_descriptor_, expr_mem_tracker()));
-    AddExprCtxsToFree(sort_exec_exprs_);
     less_than_.reset(
-        new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
+        new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
     AddCodegenDisabledMessage(state);
   }
   return Status::OK();
@@ -106,9 +104,9 @@ Status ExchangeNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
   if (is_merging_) {
-    RETURN_IF_ERROR(sort_exec_exprs_.Open(state));
     // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().
+    RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
     RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
   } else {
     RETURN_IF_ERROR(FillInputRowBatch(state));
@@ -123,12 +121,18 @@ Status ExchangeNode::Reset(RuntimeState* state) {
 
 void ExchangeNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (is_merging_) sort_exec_exprs_.Close(state);
-  if (stream_recvr_ != NULL) stream_recvr_->Close();
+  if (less_than_.get() != nullptr) less_than_->Close(state);
+  if (stream_recvr_ != nullptr) stream_recvr_->Close();
   stream_recvr_.reset();
+  ScalarExpr::Close(ordering_exprs_);
   ExecNode::Close(state);
 }
 
+Status ExchangeNode::QueryMaintenance(RuntimeState* state) {
+  if (less_than_.get() != nullptr) less_than_->FreeLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
 Status ExchangeNode::FillInputRowBatch(RuntimeState* state) {
   DCHECK(!is_merging_);
   Status ret_status;
@@ -204,7 +208,7 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch,
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
 
-  while ((num_rows_skipped_ < offset_)) {
+  while (num_rows_skipped_ < offset_) {
     num_rows_skipped_ += output_batch->num_rows();
     // Throw away rows in the output batch until the offset is skipped.
     int rows_to_keep = num_rows_skipped_ - offset_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 6feaff3..4781213 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -21,12 +21,12 @@
 
 #include <boost/scoped_ptr.hpp>
 #include "exec/exec-node.h"
-#include "exec/sort-exec-exprs.h"
 
 namespace impala {
 
-class RowBatch;
 class DataStreamRecvr;
+class RowBatch;
+class ScalarExpr;
 class TupleRowComparator;
 
 /// Receiver node for data streams. The data stream receiver is created in Prepare()
@@ -58,6 +58,7 @@ class ExchangeNode : public ExecNode {
   void set_num_senders(int num_senders) { num_senders_ = num_senders; }
 
  protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -100,7 +101,7 @@ class ExchangeNode : public ExecNode {
   boost::scoped_ptr<TupleRowComparator> less_than_;
 
   /// Sort expressions and parameters passed to the merging receiver..
-  SortExecExprs sort_exec_exprs_;
+  std::vector<ScalarExpr*> ordering_exprs_;
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 4c06ece..1505f71 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -26,7 +26,8 @@
 #include "codegen/llvm-codegen.h"
 #include "common/object-pool.h"
 #include "common/status.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exec/aggregation-node.h"
 #include "exec/analytic-eval-node.h"
 #include "exec/data-source-scan-node.h"
@@ -73,8 +74,8 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
   return p->metadata();
 }
 
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) :
-    BlockingQueue<RowBatch*>(max_batches) {
+ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
+  : BlockingQueue<RowBatch*>(max_batches) {
 }
 
 ExecNode::RowBatchQueue::~RowBatchQueue() {
@@ -140,26 +141,26 @@ ExecNode::~ExecNode() {
 
 Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(
-      Expr::CreateExprTrees(pool_, tnode.conjuncts, &conjunct_ctxs_));
+      ScalarExpr::Create(tnode.conjuncts, row_descriptor_, state, &conjuncts_));
   return Status::OK();
 }
 
 Status ExecNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
   DCHECK(runtime_profile_.get() != NULL);
-  rows_returned_counter_ =
-      ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
   mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(),
       state->instance_mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
-
+  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
   rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
       ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_,
-        runtime_profile()->total_time_counter()));
-
-  RETURN_IF_ERROR(Expr::Prepare(conjunct_ctxs_, state, row_desc(), expr_mem_tracker()));
-  AddExprCtxsToFree(conjunct_ctxs_);
+          runtime_profile()->total_time_counter()));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, expr_mem_pool(),
+      &conjunct_evals_));
+  DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+  AddEvaluatorsToFree(conjunct_evals_);
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Prepare(state));
   }
@@ -176,7 +177,8 @@ void ExecNode::Codegen(RuntimeState* state) {
 
 Status ExecNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::OPEN, state));
-  return Expr::Open(conjunct_ctxs_, state);
+  DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
+  return ScalarExprEvaluator::Open(conjunct_evals_, state);
 }
 
 Status ExecNode::Reset(RuntimeState* state) {
@@ -197,7 +199,10 @@ void ExecNode::Close(RuntimeState* state) {
   for (int i = 0; i < children_.size(); ++i) {
     children_[i]->Close(state);
   }
-  Expr::Close(conjunct_ctxs_, state);
+
+  ScalarExprEvaluator::Close(conjunct_evals_, state);
+  ScalarExpr::Close(conjuncts_);
+  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
 
   if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) {
     LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl
@@ -393,7 +398,7 @@ string ExecNode::DebugString() const {
 }
 
 void ExecNode::DebugString(int indentation_level, stringstream* out) const {
-  *out << " conjuncts=" << Expr::DebugString(conjunct_ctxs_);
+  *out << " conjuncts=" << ScalarExpr::DebugString(conjuncts_);
   for (int i = 0; i < children_.size(); ++i) {
     *out << "\n";
     children_[i]->DebugString(indentation_level + 1, out);
@@ -443,27 +448,25 @@ Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state
   return Status::OK();
 }
 
-bool ExecNode::EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row) {
-  for (int i = 0; i < num_ctxs; ++i) {
-    BooleanVal v = ctxs[i]->GetBooleanVal(row);
-    if (v.is_null || !v.val) return false;
+bool ExecNode::EvalConjuncts(
+    ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) {
+  for (int i = 0; i < num_conjuncts; ++i) {
+    if (!EvalPredicate(evals[i], row)) return false;
   }
   return true;
 }
 
 Status ExecNode::QueryMaintenance(RuntimeState* state) {
-  FreeLocalAllocations();
+  ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_);
   return state->CheckQueryState();
 }
 
-void ExecNode::AddExprCtxsToFree(const vector<ExprContext*>& ctxs) {
-  for (int i = 0; i < ctxs.size(); ++i) AddExprCtxToFree(ctxs[i]);
+void ExecNode::AddEvaluatorToFree(ScalarExprEvaluator* eval) {
+  evals_to_free_.push_back(eval);
 }
 
-void ExecNode::AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs) {
-  AddExprCtxsToFree(sort_exec_exprs.sort_tuple_slot_expr_ctxs());
-  AddExprCtxsToFree(sort_exec_exprs.lhs_ordering_expr_ctxs());
-  AddExprCtxsToFree(sort_exec_exprs.rhs_ordering_expr_ctxs());
+void ExecNode::AddEvaluatorsToFree(const vector<ScalarExprEvaluator*>& evals) {
+  for (ScalarExprEvaluator* eval : evals) AddEvaluatorToFree(eval);
 }
 
 void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) {
@@ -478,15 +481,19 @@ bool ExecNode::IsNodeCodegenDisabled() const {
   return disable_codegen_;
 }
 
-// Codegen for EvalConjuncts.  The generated signature is
-// For a node with two conjunct predicates
-// define i1 @EvalConjuncts(%"class.impala::ExprContext"** %ctxs, i32 %num_ctxs,
-//                          %"class.impala::TupleRow"* %row) #20 {
+// Codegen for EvalConjuncts.  The generated signature is the same as EvalConjuncts().
+//
+// For a node with two conjunct predicates:
+//
+// define i1 @EvalConjuncts(%"class.impala::ScalarExprEvaluator"** %evals, i32 %num_evals,
+//                          %"class.impala::TupleRow"* %row) #34 {
 // entry:
-//   %ctx_ptr = getelementptr %"class.impala::ExprContext"** %ctxs, i32 0
-//   %ctx = load %"class.impala::ExprContext"** %ctx_ptr
-//   %result = call i16 @Eq_StringVal_StringValWrapper3(
-//       %"class.impala::ExprContext"* %ctx, %"class.impala::TupleRow"* %row)
+//   %eval_ptr = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %evals, i32 0
+//   %eval = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %eval_ptr
+//   %result = call i16 @"impala::Operators::Eq_BigIntVal_BigIntValWrapper"(
+//       %"class.impala::ScalarExprEvaluator"* %eval, %"class.impala::TupleRow"* %row)
 //   %is_null = trunc i16 %result to i1
 //   %0 = ashr i16 %result, 8
 //   %1 = trunc i16 %0 to i8
@@ -496,30 +503,32 @@ bool ExecNode::IsNodeCodegenDisabled() const {
 //   br i1 %return_false, label %false, label %continue
 //
 // continue:                                         ; preds = %entry
-//   %ctx_ptr2 = getelementptr %"class.impala::ExprContext"** %ctxs, i32 1
-//   %ctx3 = load %"class.impala::ExprContext"** %ctx_ptr2
-//   %result4 = call i16 @Gt_BigIntVal_BigIntValWrapper5(
-//       %"class.impala::ExprContext"* %ctx3, %"class.impala::TupleRow"* %row)
-//   %is_null5 = trunc i16 %result4 to i1
-//   %2 = ashr i16 %result4, 8
-//   %3 = trunc i16 %2 to i8
-//   %val6 = trunc i8 %3 to i1
-//   %is_false7 = xor i1 %val6, true
-//   %return_false8 = or i1 %is_null5, %is_false7
-//   br i1 %return_false8, label %false, label %continue1
+//  %eval_ptr2 = getelementptr inbounds %"class.impala::ScalarExprEvaluator"*,
+//      %"class.impala::ScalarExprEvaluator"** %evals, i32 1
+//  %eval3 = load %"class.impala::ScalarExprEvaluator"*,
+//      %"class.impala::ScalarExprEvaluator"** %eval_ptr2
+//  %result4 = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"(
+//      %"class.impala::ScalarExprEvaluator"* %eval3, %"class.impala::TupleRow"* %row)
+//  %is_null5 = trunc i16 %result4 to i1
+//  %2 = ashr i16 %result4, 8
+//  %3 = trunc i16 %2 to i8
+//  %val6 = trunc i8 %3 to i1
+//  %is_false7 = xor i1 %val6, true
+//  %return_false8 = or i1 %is_null5, %is_false
+//  br i1 %return_false8, label %false, label %continue1
 //
 // continue1:                                        ; preds = %continue
-//   ret i1 true
+//  ret i1 true
 //
 // false:                                            ; preds = %continue, %entry
-//   ret i1 false
+//  ret i1 false
 // }
+//
 Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
-    const vector<ExprContext*>& conjunct_ctxs, Function** fn, const char* name) {
-  Function* conjunct_fns[conjunct_ctxs.size()];
-  for (int i = 0; i < conjunct_ctxs.size(); ++i) {
-    RETURN_IF_ERROR(
-        conjunct_ctxs[i]->root()->GetCodegendComputeFn(codegen, &conjunct_fns[i]));
+    const vector<ScalarExpr*>& conjuncts, Function** fn, const char* name) {
+  Function* conjunct_fns[conjuncts.size()];
+  for (int i = 0; i < conjuncts.size(); ++i) {
+    RETURN_IF_ERROR(conjuncts[i]->GetCodegendComputeFn(codegen, &conjunct_fns[i]));
     if (i >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
       // Avoid bloating EvalConjuncts by inlining everything into it.
       codegen->SetNoInline(conjunct_fns[i]);
@@ -527,43 +536,36 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
   }
 
   // Construct function signature to match
-  // bool EvalConjuncts(Expr** exprs, int num_exprs, TupleRow* row)
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  Type* expr_ctx_type = codegen->GetType(ExprContext::LLVM_CLASS_NAME);
-
-  DCHECK(tuple_row_type != NULL);
-  DCHECK(expr_ctx_type != NULL);
-
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-  PointerType* expr_ctx_ptr_type = PointerType::get(expr_ctx_type, 0);
+  // bool EvalConjuncts(ScalarExprEvaluator**, int, TupleRow*)
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  Type* eval_type = codegen->GetType(ScalarExprEvaluator::LLVM_CLASS_NAME);
 
   LlvmCodeGen::FnPrototype prototype(codegen, name, codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("ctxs", PointerType::get(expr_ctx_ptr_type, 0)));
+      LlvmCodeGen::NamedVariable("evals", codegen->GetPtrPtrType(eval_type)));
   prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("num_ctxs", codegen->GetType(TYPE_INT)));
+      LlvmCodeGen::NamedVariable("num_evals", codegen->GetType(TYPE_INT)));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
   LlvmBuilder builder(codegen->context());
   Value* args[3];
   *fn = prototype.GeneratePrototype(&builder, args);
-  Value* ctxs_arg = args[0];
+  Value* evals_arg = args[0];
   Value* tuple_row_arg = args[2];
 
-  if (conjunct_ctxs.size() > 0) {
+  if (conjuncts.size() > 0) {
     LLVMContext& context = codegen->context();
     BasicBlock* false_block = BasicBlock::Create(context, "false", *fn);
 
-    for (int i = 0; i < conjunct_ctxs.size(); ++i) {
+    for (int i = 0; i < conjuncts.size(); ++i) {
       BasicBlock* true_block = BasicBlock::Create(context, "continue", *fn, false_block);
-
-      Value* ctx_arg_ptr = builder.CreateConstGEP1_32(ctxs_arg, i, "ctx_ptr");
-      Value* ctx_arg = builder.CreateLoad(ctx_arg_ptr, "ctx");
-      Value* expr_args[] = { ctx_arg, tuple_row_arg };
+      Value* eval_arg_ptr = builder.CreateInBoundsGEP(NULL, evals_arg,
+          codegen->GetIntConstant(TYPE_INT, i), "eval_ptr");
+      Value* eval_arg = builder.CreateLoad(eval_arg_ptr, "eval");
 
       // Call conjunct_fns[i]
-      CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
-          codegen, &builder, conjunct_ctxs[i]->root()->type(), conjunct_fns[i], expr_args,
+      CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
+          conjuncts[i]->type(), conjunct_fns[i], {eval_arg, tuple_row_arg},
           "result");
 
       // Return false if result.is_null || !result
@@ -584,7 +586,7 @@ Status ExecNode::CodegenEvalConjuncts(LlvmCodeGen* codegen,
   }
 
   // Avoid inlining EvalConjuncts into caller if it is large.
-  if (conjunct_ctxs.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
+  if (conjuncts.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(*fn);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index c769be3..ceb3c49 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -23,7 +23,7 @@
 #include <vector>
 
 #include "common/status.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/PlanNodes_types.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "util/blocking-queue.h"
@@ -31,24 +31,25 @@
 
 namespace impala {
 
-class Expr;
-class ExprContext;
+class DataSink;
+class MemPool;
+class MemTracker;
 class ObjectPool;
-class Counters;
-class SortExecExprs;
 class RowBatch;
 class RuntimeState;
+class ScalarExpr;
+class SubplanNode;
 class TPlan;
 class TupleRow;
-class DataSink;
-class MemTracker;
-class SubplanNode;
 class TDebugOptions;
 
 /// Superclass of all executor nodes.
 /// All subclasses need to make sure to check RuntimeState::is_cancelled()
 /// periodically in order to ensure timely termination after the cancellation
 /// flag gets set.
+/// TODO: Move static state of ExecNode into PlanNode, of which there is one instance
+/// per fragment. ExecNode contains only runtime state and there can be up to MT_DOP
+/// instances of it per fragment.
 class ExecNode {
  public:
   /// Init conjuncts.
@@ -144,15 +145,18 @@ class ExecNode {
   /// Collect all scan node types.
   void CollectScanNodes(std::vector<ExecNode*>* nodes);
 
-  /// Evaluate ExprContexts over row.  Returns true if all exprs return true.
-  /// TODO: This doesn't use the vector<Expr*> signature because I haven't figured
-  /// out how to deal with declaring a templated std:vector type in IR
-  static bool EvalConjuncts(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
+  /// Evaluates the predicate in 'eval' over 'row' and returns the result.
+  static bool EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row);
+
+  /// Evaluate the conjuncts in 'evaluators' over 'row'.
+  /// Returns true if all exprs return true.
+  static bool EvalConjuncts(
+      ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row);
 
   /// Codegen EvalConjuncts(). Returns a non-OK status if the function couldn't be
   /// codegen'd. The codegen'd version uses inlined, codegen'd GetBooleanVal() functions.
   static Status CodegenEvalConjuncts(LlvmCodeGen* codegen,
-      const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** fn,
+      const std::vector<ScalarExpr*>& conjuncts, llvm::Function** fn,
       const char* name = "EvalConjuncts") WARN_UNUSED_RESULT;
 
   /// Returns a string representation in DFS order of the plan rooted at this.
@@ -166,7 +170,12 @@ class ExecNode {
   ///   out: Stream to accumulate debug string.
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
-  const std::vector<ExprContext*>& conjunct_ctxs() const { return conjunct_ctxs_; }
+  const std::vector<ScalarExpr*>& conjuncts() const { return conjuncts_; }
+
+  const std::vector<ScalarExprEvaluator*>& conjunct_evals() const {
+    return conjunct_evals_;
+  }
+
   int id() const { return id_; }
   TPlanNodeType::type type() const { return type_; }
   const RowDescriptor& row_desc() const { return row_descriptor_; }
@@ -184,6 +193,7 @@ class ExecNode {
   RuntimeProfile* runtime_profile() { return runtime_profile_.get(); }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
+  MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
 
   /// Return true if codegen was disabled by the planner for this ExecNode. Does not
   /// check to see if codegen was enabled for the enclosing fragment.
@@ -245,10 +255,14 @@ class ExecNode {
 
   /// Unique within a single plan tree.
   int id_;
-
   TPlanNodeType::type type_;
   ObjectPool* pool_;
-  std::vector<ExprContext*> conjunct_ctxs_;
+
+  /// Conjuncts and their evaluators in this node. 'conjuncts_' live in the
+  /// query-state's object pool while the evaluators live in this exec node's
+  /// object pool.
+  std::vector<ScalarExpr*> conjuncts_;
+  std::vector<ScalarExprEvaluator*> conjunct_evals_;
 
   std::vector<ExecNode*> children_;
   RowDescriptor row_descriptor_;
@@ -268,9 +282,13 @@ class ExecNode {
   /// Account for peak memory used by this node
   boost::scoped_ptr<MemTracker> mem_tracker_;
 
-  /// MemTracker that should be used for ExprContexts.
+  /// MemTracker used by 'expr_mem_pool_'.
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
 
+  /// MemPool for allocating data structures used by expression evaluators in this node.
+  /// Created in Prepare().
+  boost::scoped_ptr<MemPool> expr_mem_pool_;
+
   bool is_closed() const { return is_closed_; }
 
   /// Pointer to the containing SubplanNode or NULL if not inside a subplan.
@@ -302,7 +320,7 @@ class ExecNode {
   Status ExecDebugAction(
       TExecNodePhase::type phase, RuntimeState* state) WARN_UNUSED_RESULT;
 
-  /// Frees any local allocations made by expr_ctxs_to_free_ and returns the result of
+  /// Frees any local allocations made by evals_to_free_ and returns the result of
   /// state->CheckQueryState(). Nodes should call this periodically, e.g. once per input
   /// row batch. This should not be called outside the main execution thread.
   //
@@ -311,26 +329,29 @@ class ExecNode {
   /// ExecNode::QueryMaintenance().
   virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
 
-  /// Add an ExprContext to have its local allocations freed by QueryMaintenance().
+  /// Add an expr evaluator to have its local allocations freed by QueryMaintenance().
   /// Exprs that are evaluated in the main execution thread should be added. Exprs
   /// evaluated in a separate thread are generally not safe to add, since a local
   /// allocation may be freed while it's being used. Rather than using this mechanism,
-  /// threads should call FreeLocalAllocations() on local ExprContexts periodically.
-  void AddExprCtxToFree(ExprContext* ctx) { expr_ctxs_to_free_.push_back(ctx); }
-  void AddExprCtxsToFree(const std::vector<ExprContext*>& ctxs);
-  void AddExprCtxsToFree(const SortExecExprs& sort_exec_exprs);
-
-  /// Free any local allocations made by expr_ctxs_to_free_.
-  void FreeLocalAllocations() { ExprContext::FreeLocalAllocations(expr_ctxs_to_free_); }
+  /// threads should call FreeLocalAllocations() on local evaluators periodically.
+  void AddEvaluatorToFree(ScalarExprEvaluator* eval);
+  void AddEvaluatorsToFree(const std::vector<ScalarExprEvaluator*>& evals);
 
  private:
   /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not protected
   /// by a lock, it assumes all calls to Close() are made by the same thread.
   bool is_closed_;
 
-  /// Expr contexts whose local allocations are safe to free in the main execution thread.
-  std::vector<ExprContext*> expr_ctxs_to_free_;
+  /// Expr evaluators whose local allocations are safe to free in the main execution
+  /// thread.
+  std::vector<ScalarExprEvaluator*> evals_to_free_;
 };
 
+inline bool ExecNode::EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row) {
+  BooleanVal v = eval->GetBooleanVal(row);
+  if (v.is_null || !v.val) return false;
+  return true;
+}
+
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 5c33ed4..0eee704 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -67,22 +67,23 @@ void FilterStats::RegisterCounterGroup(const string& key) {
   counters[key] = counter;
 }
 
-Status FilterContext::CloneFrom(const FilterContext& from, RuntimeState* state) {
+Status FilterContext::CloneFrom(const FilterContext& from, ObjectPool* pool,
+    RuntimeState* state, MemPool* mem_pool) {
   filter = from.filter;
   stats = from.stats;
-  return from.expr_ctx->Clone(state, &expr_ctx);
+  return from.expr_eval->Clone(pool, state, mem_pool, &expr_eval);
 }
 
 bool FilterContext::Eval(TupleRow* row) const noexcept {
-  void* e = expr_ctx->GetValue(row);
-  return filter->Eval(e, expr_ctx->root()->type());
+  void* val = expr_eval->GetValue(row);
+  return filter->Eval(val, expr_eval->root().type());
 }
 
 void FilterContext::Insert(TupleRow* row) const noexcept {
   if (local_bloom_filter == NULL) return;
-  void* e = expr_ctx->GetValue(row);
+  void* val = expr_eval->GetValue(row);
   uint32_t filter_hash = RawValue::GetHashValue(
-      e, expr_ctx->root()->type(), RuntimeFilterBank::DefaultHashSeed());
+      val, expr_eval->root().type(), RuntimeFilterBank::DefaultHashSeed());
   local_bloom_filter->Insert(filter_hash);
 }
 
@@ -97,11 +98,11 @@ void FilterContext::Insert(TupleRow* row) const noexcept {
 //                              %"class.impala::TupleRow"* %row) #34 {
 // entry:
 //   %0 = alloca i16
-//   %expr_ctx_ptr = getelementptr inbounds %"struct.impala::FilterContext",
+//   %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext",
 //       %"struct.impala::FilterContext"* %this, i32 0, i32 0
-//   %expr_ctx_arg = load %"class.impala::ExprContext"*,
-//       %"class.impala::ExprContext"** %expr_ctx_ptr
-//   %result = call i32 @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx_arg,
+//   %expr_eval_arg = load %"class.impala::ExprContext"*,
+//       %"class.impala::ExprContext"** %expr_eval_ptr
+//   %result = call i32 @GetSlotRef(%"class.impala::ExprContext"* %expr_eval_arg,
 //       %"class.impala::TupleRow"* %row)
 //   %is_null1 = trunc i32 %result to i1
 //   br i1 %is_null1, label %is_null, label %not_null
@@ -127,11 +128,12 @@ void FilterContext::Insert(TupleRow* row) const noexcept {
 //       %"struct.impala::ColumnType"* @expr_type_arg)
 //   ret i1 %passed_filter
 // }
-Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const {
+Status FilterContext::CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
+    Function** fn) {
   LLVMContext& context = codegen->context();
   LlvmBuilder builder(context);
 
-  *fn = NULL;
+  *fn = nullptr;
   PointerType* this_type = codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
   PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
   LlvmCodeGen::FnPrototype prototype(codegen, "FilterContextEval",
@@ -149,24 +151,25 @@ Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const {
   BasicBlock* eval_filter_block =
       BasicBlock::Create(context, "eval_filter", eval_filter_fn);
 
-  Expr* expr = expr_ctx->root();
   Function* compute_fn;
-  RETURN_IF_ERROR(expr->GetCodegendComputeFn(codegen, &compute_fn));
-  DCHECK(compute_fn != NULL);
+  RETURN_IF_ERROR(filter_expr->GetCodegendComputeFn(codegen, &compute_fn));
+  DCHECK(compute_fn != nullptr);
 
   // The function for checking against the bloom filter for match.
   Function* runtime_filter_fn =
       codegen->GetFunction(IRFunction::RUNTIME_FILTER_EVAL, false);
-  DCHECK(runtime_filter_fn != NULL);
+  DCHECK(runtime_filter_fn != nullptr);
 
-  // Load 'expr_ctx' from 'this_arg' FilterContext object.
-  Value* expr_ctx_ptr = builder.CreateStructGEP(NULL, this_arg, 0, "expr_ctx_ptr");
-  Value* expr_ctx_arg = builder.CreateLoad(expr_ctx_ptr, "expr_ctx_arg");
+  // Load 'expr_eval' from 'this_arg' FilterContext object.
+  Value* expr_eval_ptr =
+      builder.CreateStructGEP(nullptr, this_arg, 0, "expr_eval_ptr");
+  Value* expr_eval_arg =
+      builder.CreateLoad(expr_eval_ptr, "expr_eval_arg");
 
   // Evaluate the row against the filter's expression.
-  Value* compute_fn_args[] = {expr_ctx_arg, row_arg};
+  Value* compute_fn_args[] = {expr_eval_arg, row_arg};
   CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-      expr->type(), compute_fn, compute_fn_args, "result");
+      filter_expr->type(), compute_fn, compute_fn_args, "result");
 
   // Check if the result is NULL
   Value* is_null = result.GetIsNull();
@@ -192,11 +195,11 @@ Status FilterContext::CodegenEval(LlvmCodeGen* codegen, Function** fn) const {
   // Create a global constant of the filter expression's ColumnType. It needs to be a
   // constant for constant propagation and dead code elimination in 'runtime_filter_fn'.
   Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
-  Constant* expr_type_arg = codegen->ConstantToGVPtr(col_type, expr->type().ToIR(codegen),
-      "expr_type_arg");
+  Constant* expr_type_arg = codegen->ConstantToGVPtr(col_type,
+      filter_expr->type().ToIR(codegen), "expr_type_arg");
 
   // Load 'filter' from 'this_arg' FilterContext object.
-  Value* filter_ptr = builder.CreateStructGEP(NULL, this_arg, 1, "filter_ptr");
+  Value* filter_ptr = builder.CreateStructGEP(nullptr, this_arg, 1, "filter_ptr");
   Value* filter_arg = builder.CreateLoad(filter_ptr, "filter_arg");
 
   Value* run_filter_args[] = {filter_arg, val_ptr_phi, expr_type_arg};

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 37d139d..fa95b91 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -20,9 +20,7 @@
 #define IMPALA_EXEC_FILTER_CONTEXT_H
 
 #include <boost/unordered_map.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
@@ -30,6 +28,7 @@ namespace impala {
 class BloomFilter;
 class LlvmCodeGen;
 class RuntimeFilter;
+class ScalarExpr;
 class TupleRow;
 
 /// Container struct for per-filter statistics, with statistics for each granularity of
@@ -80,45 +79,43 @@ class FilterStats {
 /// FilterContext contains all metadata for a single runtime filter, and allows the filter
 /// to be applied in the context of a single thread.
 struct FilterContext {
-  /// Expression which produces a value to test against the runtime filter.
-  /// This field is referenced in generated code so if the order of it changes
-  /// inside this struct, please update CodegenEval().
-  ExprContext* expr_ctx;
+  /// Evaluator for 'expr'. This field is referenced in generated code so if the order
+  /// of it changes inside this struct, please update CodegenEval().
+  ScalarExprEvaluator* expr_eval = nullptr;
 
   /// Cache of filter from runtime filter bank.
   /// The field is referenced in generated code so if the order of it changes
   /// inside this struct, please update CodegenEval().
-  const RuntimeFilter* filter;
+  const RuntimeFilter* filter = nullptr;
 
   /// Statistics for this filter, owned by object pool.
-  FilterStats* stats;
+  FilterStats* stats = nullptr;
 
   /// Working copy of local bloom filter
-  BloomFilter* local_bloom_filter;
+  BloomFilter* local_bloom_filter = nullptr;
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
 
   /// Clones this FilterContext for use in a multi-threaded context (i.e. by scanner
   /// threads).
-  Status CloneFrom(const FilterContext& from, RuntimeState* state);
+  Status CloneFrom(const FilterContext& from, ObjectPool* pool, RuntimeState* state,
+      MemPool* mem_pool);
 
-  /// Evaluates 'row' on the expression in 'expr_ctx' with the resulting value being
-  /// checked against runtime filter 'filter' for matches. Returns true if 'row' finds
+  /// Evaluates 'row' with 'expr_eval' with the resulting value being checked
+  /// against runtime filter 'filter' for matches. Returns true if 'row' finds
   /// a match in 'filter'. Returns false otherwise.
   bool Eval(TupleRow* row) const noexcept;
 
-  /// Evaluates 'row' on the expression in 'expr_ctx' and hashes the resulting value.
+  /// Evaluates 'row' with 'expr_eval' and hashes the resulting value.
   /// The hash value is then used for setting some bits in 'local_bloom_filter'.
   void Insert(TupleRow* row) const noexcept;
 
-  /// Codegen Eval() by codegen'ing the expression evaluations and replacing the type
+  /// Codegen Eval() by codegen'ing the expression 'filter_expr' and replacing the type
   /// argument to RuntimeFilter::Eval() with a constant. On success, 'fn' is set to
   /// the generated function. On failure, an error status is returned.
-  Status CodegenEval(LlvmCodeGen* codegen, llvm::Function** fn) const;
-
-  FilterContext()
-      : expr_ctx(NULL), filter(NULL), local_bloom_filter(NULL) { }
+  static Status CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
+     llvm::Function** fn) WARN_UNUSED_RESULT;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node-ir.cc b/be/src/exec/hash-join-node-ir.cc
index 7ef7ca4..25aa556 100644
--- a/be/src/exec/hash-join-node-ir.cc
+++ b/be/src/exec/hash-join-node-ir.cc
@@ -18,6 +18,7 @@
 #include "codegen/impala-ir.h"
 #include "exec/hash-join-node.h"
 #include "exec/old-hash-table.inline.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/row-batch.h"
 
 #include "common/names.h"
@@ -34,8 +35,8 @@ using namespace impala;
 // TODO: explicitly set the calling convention?
 // TODO: investigate using fastcc for all codegen internal functions?
 bool IR_NO_INLINE EvalOtherJoinConjuncts2(
-    ExprContext* const* ctxs, int num_ctxs, TupleRow* row) {
-  return ExecNode::EvalConjuncts(ctxs, num_ctxs, row);
+    ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) {
+  return ExecNode::EvalConjuncts(evals, num_evals, row);
 }
 
 // CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
@@ -52,11 +53,14 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
   int rows_returned = 0;
   int probe_rows = probe_batch->num_rows();
 
-  ExprContext* const* other_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
-  const int num_other_conjunct_ctxs = other_join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* other_conjunct_evals =
+      other_join_conjunct_evals_.data();
+  const int num_other_conjuncts = other_join_conjuncts_.size();
+  DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size());
 
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  const int num_conjunct_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  const int num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
 
   while (true) {
     // Create output row for each matching build row
@@ -67,10 +71,10 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
       if (join_op_ == TJoinOp::LEFT_SEMI_JOIN) {
         // Evaluate the non-equi-join conjuncts against a temp row assembled from all
         // build and probe tuples.
-        if (num_other_conjunct_ctxs > 0) {
+        if (num_other_conjuncts > 0) {
           CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-          if (!EvalOtherJoinConjuncts2(other_conjunct_ctxs, num_other_conjunct_ctxs,
-                semi_join_staging_row_)) {
+          if (!EvalOtherJoinConjuncts2(other_conjunct_evals, num_other_conjuncts,
+                  semi_join_staging_row_)) {
             continue;
           }
         }
@@ -78,13 +82,13 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
       } else {
         CreateOutputRow(out_row, current_probe_row_, matched_build_row);
         if (!EvalOtherJoinConjuncts2(
-              other_conjunct_ctxs, num_other_conjunct_ctxs, out_row)) {
+                other_conjunct_evals, num_other_conjuncts, out_row)) {
           continue;
         }
       }
       matched_probe_ = true;
 
-      if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) {
+      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
         ++rows_returned;
         // Filled up out batch or hit limit
         if (UNLIKELY(rows_returned == max_added_rows)) goto end;
@@ -104,7 +108,7 @@ int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
     if (!matched_probe_ && match_all_probe_) {
       CreateOutputRow(out_row, current_probe_row_, NULL);
       matched_probe_ = true;
-      if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) {
+      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
         ++rows_returned;
         if (UNLIKELY(rows_returned == max_added_rows)) goto end;
         // Advance to next out row


Mime
View raw message