impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [07/14] incubator-impala git commit: IMPALA-4192: Disentangle Expr and ExprContext
Date Sun, 18 Jun 2017 18:36:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index b133be6..f7c1f88 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -33,9 +33,10 @@
 
 namespace impala {
 
-class ExprContext;
 class RowDescriptor;
 class RuntimeState;
+class ScalarExpr;
+class ScalarExprEvaluator;
 
 /// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned
 /// into PARTITION_FANOUT partitions, with partitions spilled if the full build side
@@ -73,7 +74,8 @@ class PhjBuilder : public DataSink {
   PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& probe_row_desc,
       const RowDescriptor& build_row_desc, RuntimeState* state);
 
-  Status Init(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+  Status InitExprsAndFilters(RuntimeState* state,
+      const std::vector<TEqJoinCondition>& eq_join_conjuncts,
       const std::vector<TRuntimeFilterDesc>& filters);
 
   /// Implementations of DataSink interface methods.
@@ -236,6 +238,12 @@ class PhjBuilder : public DataSink {
     std::unique_ptr<BufferedTupleStream> build_rows_;
   };
 
+ protected:
+  /// Init() function inherited from DataSink. Overridden to be a no-op for now.
+  /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink.
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state) override;
+
  private:
   /// Computes the minimum number of buffers required to execute the spilling partitioned
   /// hash algorithm successfully for any input size (assuming enough disk space is
@@ -253,6 +261,9 @@ class PhjBuilder : public DataSink {
     return num_reserved_buffers;
   }
 
+  /// Free local allocations made from expr evaluators during hash table construction.
+  void FreeLocalAllocations() const;
+
   /// Create and initialize a set of hash partitions for partitioning level 'level'.
   /// The previous hash partitions must have been cleared with ClearHashPartitions().
   /// After calling this, batches are added to the new partitions by calling Send().
@@ -357,18 +368,18 @@ class PhjBuilder : public DataSink {
   /// If true, the build side has at least one row.
   bool non_empty_build_;
 
-  /// Expr contexts to free after partitioning or inserting each batch.
-  std::vector<ExprContext*> expr_ctxs_to_free_;
-
-  /// Expression contexts over input rows for hash table build.
-  std::vector<ExprContext*> build_expr_ctxs_;
+  /// Expressions over input rows for hash table build.
+  std::vector<ScalarExpr*> build_exprs_;
 
   /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
   /// NOT DISTINCT FROM, rather than equality.
   std::vector<bool> is_not_distinct_from_;
 
-  /// List of filters to build.
-  std::vector<FilterContext> filters_;
+  /// Expressions for evaluating input rows for insertion into runtime filters.
+  std::vector<ScalarExpr*> filter_exprs_;
+
+  /// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'.
+  std::vector<FilterContext> filter_ctxs_;
 
   /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
   /// The level is set to the same level as 'hash_partitions_'.
@@ -423,8 +434,8 @@ class PhjBuilder : public DataSink {
 
   /// Partition used for null-aware joins. This partition is always processed at the end
   /// after all build and probe rows are processed. In this partition's 'build_rows_', we
-  /// store all the rows for which 'build_expr_ctxs_' evaluated over the row returns NULL
-  /// (i.e. it has a NULL on the eq join slot).
+  /// store all the rows for which 'build_expr_evals_' evaluated over the row returns
+  /// NULL (i.e. it has a NULL on the eq join slot).
   /// NULL if the join is not null aware or we are done processing this partition.
   Partition* null_aware_partition_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index bbed06d..42c9f4f 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -33,14 +33,14 @@ namespace impala {
 // TODO: explicitly set the calling convention?
 // TODO: investigate using fastcc for all codegen internal functions?
 bool IR_NO_INLINE EvalOtherJoinConjuncts(
-    ExprContext* const* ctxs, int num_ctxs, TupleRow* row) {
-  return ExecNode::EvalConjuncts(ctxs, num_ctxs, row);
+    ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) {
+  return ExecNode::EvalConjuncts(evals, num_evals, row);
 }
 
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin(
-    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-    ExprContext* const* conjunct_ctxs, int num_conjuncts,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+    ScalarExprEvaluator* const* other_join_conjunct_evals,
+    int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+    int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
   DCHECK(current_probe_row_ != NULL);
   TupleRow* out_row = out_batch_iterator->Get();
   for (; !hash_tbl_iterator_.AtEnd(); hash_tbl_iterator_.NextDuplicate()) {
@@ -50,11 +50,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin(
     // Create an output row with all probe/build tuples and evaluate the
     // non-equi-join conjuncts.
     CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-    if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
+    if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts,
         out_row)) {
       continue;
     }
-    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+    if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
       --(*remaining_capacity);
       if (*remaining_capacity == 0) {
         hash_tbl_iterator_.NextDuplicate();
@@ -68,9 +68,9 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowInnerJoin(
 
 template<int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins(
-    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-    ExprContext* const* conjunct_ctxs, int num_conjuncts,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+    ScalarExprEvaluator* const* other_join_conjunct_evals,
+    int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+    int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
   DCHECK(current_probe_row_ != NULL);
   DCHECK(JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN);
   TupleRow* out_row = out_batch_iterator->Get();
@@ -83,7 +83,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins(
     // build and probe tuples.
     if (num_other_join_conjuncts > 0) {
       CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-      if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
+      if (!EvalOtherJoinConjuncts(other_join_conjunct_evals,
           num_other_join_conjuncts, semi_join_staging_row_)) {
         continue;
       }
@@ -93,7 +93,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins(
     // Update the hash table to indicate that this entry has been matched.
     hash_tbl_iterator_.SetMatched();
     if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN &&
-        ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+        ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
       --(*remaining_capacity);
       if (*remaining_capacity == 0) {
         hash_tbl_iterator_.NextDuplicate();
@@ -107,9 +107,10 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowRightSemiJoins(
 
 template<int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
-    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-    ExprContext* const* conjunct_ctxs, int num_conjuncts,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+    ScalarExprEvaluator* const* other_join_conjunct_evals,
+    int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+    int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
+    Status* status) {
   DCHECK(current_probe_row_ != NULL);
   DCHECK(JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
       JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
@@ -121,7 +122,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
     // build and probe tuples.
     if (num_other_join_conjuncts > 0) {
       CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-      if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs,
+      if (!EvalOtherJoinConjuncts(other_join_conjunct_evals,
           num_other_join_conjuncts, semi_join_staging_row_)) {
         continue;
       }
@@ -133,7 +134,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
     hash_tbl_iterator_.SetAtEnd();
     // Append to output batch for left semi joins if the conjuncts are satisfied.
     if (JoinOp == TJoinOp::LEFT_SEMI_JOIN &&
-        ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+        ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
       --(*remaining_capacity);
       if (*remaining_capacity == 0) return false;
       out_row = out_batch_iterator->Next();
@@ -159,7 +160,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
       }
     }
     // No match for this current_probe_row_, we need to output it. No need to
-    // evaluate the conjunct_ctxs since anti joins cannot have any.
+    // evaluate the conjunct_evals since anti joins cannot have any.
     out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
     matched_probe_ = true;
     --(*remaining_capacity);
@@ -171,9 +172,9 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
 
 template<int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
-    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-    ExprContext* const* conjunct_ctxs, int num_conjuncts,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
+    ScalarExprEvaluator* const* other_join_conjunct_evals,
+    int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+    int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
   DCHECK(JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
       JoinOp == TJoinOp::FULL_OUTER_JOIN);
   DCHECK(current_probe_row_ != NULL);
@@ -184,7 +185,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
     // Create an output row with all probe/build tuples and evaluate the
     // non-equi-join conjuncts.
     CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-    if (!EvalOtherJoinConjuncts(other_join_conjunct_ctxs, num_other_join_conjuncts,
+    if (!EvalOtherJoinConjuncts(other_join_conjunct_evals, num_other_join_conjuncts,
         out_row)) {
       continue;
     }
@@ -195,7 +196,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
       // as matched for right/full outer joins.
       hash_tbl_iterator_.SetMatched();
     }
-    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+    if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
       --(*remaining_capacity);
       if (*remaining_capacity == 0) {
         hash_tbl_iterator_.NextDuplicate();
@@ -208,7 +209,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
   if (JoinOp != TJoinOp::RIGHT_OUTER_JOIN && !matched_probe_) {
     // No match for this row, we need to output it if it's a left/full outer join.
     CreateOutputRow(out_row, current_probe_row_, NULL);
-    if (ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, out_row)) {
+    if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
       matched_probe_ = true;
       --(*remaining_capacity);
       if (*remaining_capacity == 0) return false;
@@ -220,28 +221,30 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
 
 template <int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
-    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-    ExprContext* const* conjunct_ctxs, int num_conjuncts,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+    ScalarExprEvaluator* const* other_join_conjunct_evals,
+    int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+    int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
+    Status* status) {
   if (JoinOp == TJoinOp::INNER_JOIN) {
-    return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
-        conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity);
+    return ProcessProbeRowInnerJoin(other_join_conjunct_evals,
+        num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator,
+        remaining_capacity);
   } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
              JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
-    return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+    return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_evals,
+        num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator,
         remaining_capacity);
   } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
              JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
              JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+    return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_evals,
+        num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator,
         remaining_capacity, status);
   } else {
     DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
            JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
-    return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
-        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+    return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_evals,
+        num_other_join_conjuncts, conjunct_evals, num_conjuncts, out_batch_iterator,
         remaining_capacity);
   }
 }
@@ -269,7 +272,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
     // Fetch the hash and expr values' nullness for this row.
     if (expr_vals_cache->IsRowNull()) {
       if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && builder_->non_empty_build()) {
-        const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
+        const int num_other_join_conjuncts = other_join_conjunct_evals_.size();
         // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
         // 1. No build rows -> Return this row. The check for 'non_empty_build_'
         //    is for this case.
@@ -363,10 +366,11 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
     RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
   DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION
       || state_ == REPARTITIONING_PROBE);
-  ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
-  const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  const int num_conjuncts = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* other_join_conjunct_evals =
+      other_join_conjunct_evals_.data();
+  const int num_other_join_conjuncts = other_join_conjunct_evals_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  const int num_conjuncts = conjunct_evals_.size();
 
   DCHECK(!out_batch->AtCapacity());
   DCHECK_GE(probe_batch_pos_, 0);
@@ -399,9 +403,9 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
     do {
       // 'current_probe_row_' can be NULL on the first iteration through this loop.
       if (current_probe_row_ != NULL) {
-        if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts,
-            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity,
-            status)) {
+        if (!ProcessProbeRow<JoinOp>(other_join_conjunct_evals,
+                num_other_join_conjuncts, conjunct_evals, num_conjuncts,
+                &out_batch_iterator, &remaining_capacity, status)) {
           if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
           break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index d3a10d3..d36aa71 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -24,8 +24,8 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/hash-table.inline.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/buffered-tuple-stream.inline.h"
@@ -79,19 +79,26 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   // being separated out further.
   builder_.reset(
       new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
-  RETURN_IF_ERROR(builder_->Init(state, eq_join_conjuncts, tnode.runtime_filters));
+  RETURN_IF_ERROR(
+      builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
 
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
-    ExprContext* ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.left, &ctx));
-    probe_expr_ctxs_.push_back(ctx);
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.right, &ctx));
-    build_expr_ctxs_.push_back(ctx);
+    ScalarExpr* probe_expr;
+    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.left, child(0)->row_desc(),
+        state, &probe_expr));
+    probe_exprs_.push_back(probe_expr);
+    ScalarExpr* build_expr;
+    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.right, child(1)->row_desc(),
+        state, &build_expr));
+    build_exprs_.push_back(build_expr);
   }
-  RETURN_IF_ERROR(Expr::CreateExprTrees(
-      pool_, tnode.hash_join_node.other_join_conjuncts, &other_join_conjunct_ctxs_));
-  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-      eq_join_conjuncts.size() == 1);
+  // other_join_conjuncts_ are evaluated in the context of rows assembled from all build
+  // and probe tuples; full_row_desc is not necessarily the same as the output row desc,
+  // e.g., because semi joins only return the build xor probe tuples
+  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
+      full_row_desc, state, &other_join_conjuncts_));
+  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || eq_join_conjuncts.size() == 1);
   return Status::OK();
 }
 
@@ -104,31 +111,14 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 
-  // build and probe exprs are evaluated in the context of the rows produced by our
-  // right and left children, respectively
-  RETURN_IF_ERROR(
-      Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker()));
-  RETURN_IF_ERROR(
-      Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker()));
-
-  // Build expressions may be evaluated during probing, so must be freed.
-  // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
-  // values in ExprValuesCache. Local allocations need to survive until the cache is reset
-  // so we need to manually free probe expr local allocations.
-  AddExprCtxsToFree(build_expr_ctxs_);
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_,
+      expr_mem_pool(), &other_join_conjunct_evals_));
+  AddEvaluatorsToFree(other_join_conjunct_evals_);
 
-  // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all
-  // build and probe tuples; full_row_desc is not necessarily the same as the output row
-  // desc, e.g., because semi joins only return the build xor probe tuples
-  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
-  RETURN_IF_ERROR(
-      Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
-  AddExprCtxsToFree(other_join_conjunct_ctxs_);
-
-  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
+  RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, probe_exprs_,
       builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
       state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
-      child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), &ht_ctx_));
+      child(1)->row_desc().tuple_descriptors().size(), expr_mem_pool(), &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
   }
@@ -163,9 +153,8 @@ void PartitionedHashJoinNode::Codegen(RuntimeState* state) {
 Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Open(state));
-  RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
-  RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state));
-  RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state));
+  RETURN_IF_ERROR(ht_ctx_->Open(state));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state));
 
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     RETURN_IF_ERROR(InitNullAwareProbePartition());
@@ -179,7 +168,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   // (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to
   // be freed now as they don't get freed again till probing. Other exprs' local allocations
   // are freed in ExecNode::FreeLocalAllocations().
-  ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
+  ht_ctx_->FreeProbeLocalAllocations();
 
   RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
   RETURN_IF_ERROR(PrepareForProbe());
@@ -192,6 +181,15 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
+Status PartitionedHashJoinNode::QueryMaintenance(RuntimeState* state) {
+  // Build expressions may be evaluated during probing, so must be freed.
+  // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
+  // values in ExprValuesCache. Local allocations need to survive until the cache is reset
+  // so we need to manually free probe expr local allocations.
+  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeBuildLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
 Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_probe_output_idx_ = -1;
@@ -240,15 +238,17 @@ void PartitionedHashJoinNode::CloseAndDeletePartitions() {
 
 void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (ht_ctx_ != NULL) ht_ctx_->Close();
+  if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
+  ht_ctx_.reset();
   nulls_build_batch_.reset();
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions();
-  if (builder_ != NULL) builder_->Close(state);
-  Expr::Close(build_expr_ctxs_, state);
-  Expr::Close(probe_expr_ctxs_, state);
-  Expr::Close(other_join_conjunct_ctxs_, state);
+  if (builder_ != nullptr) builder_->Close(state);
+  ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
+  ScalarExpr::Close(build_exprs_);
+  ScalarExpr::Close(probe_exprs_);
+  ScalarExpr::Close(other_join_conjuncts_);
   BlockingJoinNode::Close(state);
 }
 
@@ -591,7 +591,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     // Free local allocations of the probe side expressions only after ExprValuesCache
     // has been reset.
     DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
-    ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
+    ht_ctx_->FreeProbeLocalAllocations();
 
     // We want to return as soon as we have attached a tuple stream to the out_batch
     // (before preparing a new partition). The attached tuple stream will be recycled
@@ -665,8 +665,8 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
   // it is done by the loop in GetNext(). So, there must be exactly one partition in
   // 'output_build_partitions_' here.
   DCHECK_EQ(output_build_partitions_.size(), 1);
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  const int num_conjuncts = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  const int num_conjuncts = conjuncts_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
 
   bool eos = false;
@@ -684,7 +684,7 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
          output_unmatched_batch_iter_->Next()) {
       OutputBuildRow(out_batch, output_unmatched_batch_iter_->Get(), &out_batch_iterator);
       if (ExecNode::EvalConjuncts(
-              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
+              conjunct_evals, num_conjuncts, out_batch_iterator.Get())) {
         out_batch->CommitLastRow();
         out_batch_iterator.Next();
       }
@@ -705,8 +705,8 @@ Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
 }
 
 Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  const int num_conjuncts = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  const int num_conjuncts = conjuncts_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
 
   while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
@@ -714,7 +714,7 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_
     if (!hash_tbl_iterator_.IsMatched()) {
       OutputBuildRow(out_batch, hash_tbl_iterator_.GetRow(), &out_batch_iterator);
       if (ExecNode::EvalConjuncts(
-              conjunct_ctxs, num_conjuncts, out_batch_iterator.Get())) {
+              conjunct_evals, num_conjuncts, out_batch_iterator.Get())) {
         out_batch->CommitLastRow();
         out_batch_iterator.Next();
       }
@@ -892,8 +892,8 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   DCHECK(null_aware_probe_partition_ != NULL);
   DCHECK(nulls_build_batch_ != NULL);
 
-  ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
-  int num_join_conjuncts = other_join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
+  int num_join_conjuncts = other_join_conjuncts_.size();
   DCHECK(probe_batch_ != NULL);
 
   BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
@@ -919,12 +919,11 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
   for (; probe_batch_pos_ < probe_batch_->num_rows(); ++probe_batch_pos_) {
     if (out_batch->AtCapacity()) break;
     TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_);
-
     bool matched = false;
     for (int i = 0; i < nulls_build_batch_->num_rows(); ++i) {
       CreateOutputRow(semi_join_staging_row_, probe_row, nulls_build_batch_->GetRow(i));
       if (ExecNode::EvalConjuncts(
-          join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) {
+              join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
         matched = true;
         break;
       }
@@ -1003,8 +1002,8 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
   RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows));
   if (!got_rows) return NullAwareAntiJoinError(false);
 
-  ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
-  int num_join_conjuncts = other_join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
+  int num_join_conjuncts = other_join_conjuncts_.size();
 
   DCHECK_LE(probe_rows->num_rows(), matched_null_probe_.size());
   // For each row, iterate over all rows in the build table.
@@ -1015,7 +1014,7 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
       CreateOutputRow(semi_join_staging_row_, probe_rows->GetRow(i),
           build_rows->GetRow(j));
       if (ExecNode::EvalConjuncts(
-            join_conjunct_ctxs, num_join_conjuncts, semi_join_staging_row_)) {
+              join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) {
         matched_null_probe_[i] = true;
         break;
       }
@@ -1107,8 +1106,8 @@ void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) co
   *out << " hash_tbl=";
   *out << string(indent * 2, ' ');
   *out << "HashTbl("
-       << " build_exprs=" << Expr::DebugString(build_expr_ctxs_)
-       << " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_);
+       << " build_exprs=" << ScalarExpr::DebugString(build_exprs_)
+       << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_);
   *out << ")";
 }
 
@@ -1369,12 +1368,12 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
 
   // Codegen evaluating other join conjuncts
   Function* eval_other_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, other_join_conjunct_ctxs_,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_,
       &eval_other_conjuncts_fn, "EvalOtherConjuncts"));
 
   // Codegen evaluating conjuncts
   Function* eval_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs_,
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts_,
       &eval_conjuncts_fn));
 
   // Replace all call sites with codegen version

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 6d4e7f4..d6f6f18 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -117,6 +117,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual void Close(RuntimeState* state);
 
  protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
   virtual Status ProcessBuildInput(RuntimeState* state);
 
@@ -182,9 +183,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Using a separate variable is probably faster than calling
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
   bool inline ProcessProbeRowInnerJoin(
-      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-      ExprContext* const* conjunct_ctxs, int num_conjuncts,
-      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals,
+      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
 
   /// Probes and updates the hash table for the current probe row for either
   /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
@@ -200,9 +201,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
   template<int const JoinOp>
   bool inline ProcessProbeRowRightSemiJoins(
-      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-      ExprContext* const* conjunct_ctxs, int num_conjuncts,
-      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals,
+      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
 
   /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
   /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
@@ -217,9 +218,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// 'out_batch_iterator->parent()->AtCapacity()' as it avoids unnecessary memory load.
   template<int const JoinOp>
   bool inline ProcessProbeRowLeftSemiJoins(
-      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-      ExprContext* const* conjunct_ctxs, int num_conjuncts,
-      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+      ScalarExprEvaluator* const* other_join_conjunct_evals,
+      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
+      Status* status);
 
   /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
   /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
@@ -235,18 +237,19 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// 'status' may be updated if appending to null aware BTS fails.
   template<int const JoinOp>
   bool inline ProcessProbeRowOuterJoins(
-      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-      ExprContext* const* conjunct_ctxs, int num_conjuncts,
-      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+      ScalarExprEvaluator* const* other_join_conjunct_evals,
+      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
 
   /// Probes 'current_probe_row_' against the the hash tables and append outputs
   /// to output batch. Wrapper around the join-type specific probe row functions
   /// declared above.
   template<int const JoinOp>
   bool inline ProcessProbeRow(
-      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
-      ExprContext* const* conjunct_ctxs, int num_conjuncts,
-      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+      ScalarExprEvaluator* const* other_join_conjunct_evals,
+      int num_other_join_conjuncts, ScalarExprEvaluator* const* conjunct_evals,
+      int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity,
+      Status* status);
 
   /// Evaluates some number of rows in 'probe_batch_' against the probe expressions
   /// and hashes the results to 32-bit hash values. The evaluation results and the hash
@@ -389,14 +392,17 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   RuntimeState* runtime_state_;
 
   /// Our equi-join predicates "<lhs> = <rhs>" are separated into
-  /// build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0))
-  std::vector<ExprContext*> build_expr_ctxs_;
-  std::vector<ExprContext*> probe_expr_ctxs_;
+  /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0))
+  std::vector<ScalarExpr*> build_exprs_;
+  std::vector<ScalarExpr*> probe_exprs_;
 
   /// Non-equi-join conjuncts from the ON clause.
-  std::vector<ExprContext*> other_join_conjunct_ctxs_;
+  std::vector<ScalarExpr*> other_join_conjuncts_;
+  std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_;
 
   /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
+  /// This owns the evaluators for the build and probe expressions used during insertion
+  /// and probing of the hash tables.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
   /// The iterator that corresponds to the look up of current_probe_row_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index f513800..eeece0f 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -17,8 +17,8 @@
 
 #include "exec/plan-root-sink.h"
 
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 #include "service/query-result-set.h"
@@ -34,24 +34,8 @@ namespace impala {
 
 const string PlanRootSink::NAME = "PLAN_ROOT_SINK";
 
-PlanRootSink::PlanRootSink(const RowDescriptor& row_desc,
-    const std::vector<TExpr>& output_exprs, const TDataSink& thrift_sink)
-  : DataSink(row_desc), thrift_output_exprs_(output_exprs) {}
-
-Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  RETURN_IF_ERROR(
-      Expr::CreateExprTrees(state->obj_pool(), thrift_output_exprs_, &output_expr_ctxs_));
-  RETURN_IF_ERROR(
-      Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get()));
-
-  return Status::OK();
-}
-
-Status PlanRootSink::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
-  return Status::OK();
-}
+PlanRootSink::PlanRootSink(const RowDescriptor& row_desc)
+  : DataSink(row_desc) { }
 
 namespace {
 
@@ -100,7 +84,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
     DCHECK(results_ != nullptr);
     // List of expr values to hold evaluated rows from the query
     vector<void*> result_row;
-    result_row.resize(output_expr_ctxs_.size());
+    result_row.resize(output_exprs_.size());
 
     // List of scales for floating point values in result_row
     vector<int> scales;
@@ -116,7 +100,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
     }
     // Signal the consumer.
     results_ = nullptr;
-    ExprContext::FreeLocalAllocations(output_expr_ctxs_);
+    ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
     consumer_cv_.notify_all();
   }
   return Status::OK();
@@ -139,7 +123,6 @@ void PlanRootSink::Close(RuntimeState* state) {
   // Wait for consumer to be done, in case sender tries to tear-down this sink while the
   // sender is still reading from it.
   while (!consumer_done_) sender_cv_.wait(l);
-  Expr::Close(output_expr_ctxs_, state);
   DataSink::Close(state);
 }
 
@@ -165,10 +148,10 @@ Status PlanRootSink::GetNext(
 
 void PlanRootSink::GetRowValue(
     TupleRow* row, vector<void*>* result, vector<int>* scales) {
-  DCHECK(result->size() >= output_expr_ctxs_.size());
-  for (int i = 0; i < output_expr_ctxs_.size(); ++i) {
-    (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
-    (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
+  DCHECK(result->size() >= output_expr_evals_.size());
+  for (int i = 0; i < output_expr_evals_.size(); ++i) {
+    (*result)[i] = output_expr_evals_[i]->GetValue(row);
+    (*scales)[i] = output_expr_evals_[i]->output_scale();
   }
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index a9c47c3b..c45d327 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -27,7 +27,7 @@ namespace impala {
 class TupleRow;
 class RowBatch;
 class QueryResultSet;
-class ExprContext;
+class ScalarExprEvaluator;
 
 /// Sink which manages the handoff between a 'sender' (a fragment instance) that produces
 /// batches by calling Send(), and a 'consumer' (e.g. the coordinator) which consumes rows
@@ -57,15 +57,10 @@ class ExprContext;
 /// and consumer. See IMPALA-4268.
 class PlanRootSink : public DataSink {
  public:
-  PlanRootSink(const RowDescriptor& row_desc, const std::vector<TExpr>& output_exprs,
-      const TDataSink& thrift_sink);
+  PlanRootSink(const RowDescriptor& row_desc);
 
   virtual std::string GetName() { return NAME; }
 
-  virtual Status Prepare(RuntimeState* state, MemTracker* tracker);
-
-  virtual Status Open(RuntimeState* state);
-
   /// Sends a new batch. Ownership of 'batch' remains with the sender. Blocks until the
   /// consumer has consumed 'batch' by calling GetNext().
   virtual Status Send(RuntimeState* state, RowBatch* batch);
@@ -125,12 +120,8 @@ class PlanRootSink : public DataSink {
   /// Set to true in Send() and FlushFinal() when the Sink() has finished producing rows.
   bool eos_ = false;
 
-  /// Output expressions to map plan row batches onto result set rows.
-  std::vector<TExpr> thrift_output_exprs_;
-  std::vector<ExprContext*> output_expr_ctxs_;
-
-  /// Writes a single row into 'result' and 'scales' by evaluating output_expr_ctxs_ over
-  /// 'row'.
+  /// Writes a single row into 'result' and 'scales' by evaluating
+  /// output_expr_evals_ over 'row'.
   void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);
 };
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/select-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index c8f8b1e..86aac1d 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -16,7 +16,8 @@
 // under the License.
 
 #include "exec/select-node.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/raw-value.h"
@@ -93,8 +94,9 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
 }
 
 bool SelectNode::CopyRows(RowBatch* output_batch) {
-  ExprContext** conjunct_ctxs = &conjunct_ctxs_[0];
-  int num_conjunct_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  int num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
 
   while (child_row_idx_ < child_row_batch_->num_rows()) {
     // Add a new row to output_batch
@@ -104,7 +106,7 @@ bool SelectNode::CopyRows(RowBatch* output_batch) {
     // Make sure to increment row idx before returning.
     ++child_row_idx_;
 
-    if (EvalConjuncts(conjunct_ctxs, num_conjunct_ctxs, src_row)) {
+    if (EvalConjuncts(conjunct_evals, num_conjuncts, src_row)) {
       output_batch->CopyRow(src_row, dst_row);
       output_batch->CommitLastRow();
       ++num_rows_returned_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-exec-exprs.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-exec-exprs.cc b/be/src/exec/sort-exec-exprs.cc
deleted file mode 100644
index 3e3f60d..0000000
--- a/be/src/exec/sort-exec-exprs.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/sort-exec-exprs.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-Status SortExecExprs::Init(const TSortInfo& sort_info, ObjectPool* pool) {
-  return Init(sort_info.ordering_exprs,
-      sort_info.__isset.sort_tuple_slot_exprs ? &sort_info.sort_tuple_slot_exprs : NULL,
-      pool);
-}
-
-Status SortExecExprs::Init(const vector<TExpr>& ordering_exprs,
-    const vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool) {
-  RETURN_IF_ERROR(
-      Expr::CreateExprTrees(pool, ordering_exprs, &lhs_ordering_expr_ctxs_));
-
-  if (sort_tuple_slot_exprs != NULL) {
-    materialize_tuple_ = true;
-    RETURN_IF_ERROR(Expr::CreateExprTrees(pool, *sort_tuple_slot_exprs,
-        &sort_tuple_slot_expr_ctxs_));
-  } else {
-    materialize_tuple_ = false;
-  }
-  return Status::OK();
-}
-
-Status SortExecExprs::Init(const vector<ExprContext*>& lhs_ordering_expr_ctxs,
-                           const vector<ExprContext*>& rhs_ordering_expr_ctxs) {
-  lhs_ordering_expr_ctxs_ = lhs_ordering_expr_ctxs;
-  rhs_ordering_expr_ctxs_ = rhs_ordering_expr_ctxs;
-  return Status::OK();
-}
-
-Status SortExecExprs::Prepare(RuntimeState* state, const RowDescriptor& child_row_desc,
-    const RowDescriptor& output_row_desc, MemTracker* expr_mem_tracker) {
-  if (materialize_tuple_) {
-    RETURN_IF_ERROR(Expr::Prepare(
-        sort_tuple_slot_expr_ctxs_, state, child_row_desc, expr_mem_tracker));
-  }
-  RETURN_IF_ERROR(Expr::Prepare(
-      lhs_ordering_expr_ctxs_, state, output_row_desc, expr_mem_tracker));
-  return Status::OK();
-}
-
-Status SortExecExprs::Open(RuntimeState* state) {
-  if (materialize_tuple_) {
-    RETURN_IF_ERROR(Expr::Open(sort_tuple_slot_expr_ctxs_, state));
-  }
-  RETURN_IF_ERROR(Expr::Open(lhs_ordering_expr_ctxs_, state));
-  RETURN_IF_ERROR(Expr::CloneIfNotExists(
-      lhs_ordering_expr_ctxs_, state, &rhs_ordering_expr_ctxs_));
-  return Status::OK();
-}
-
-void SortExecExprs::Close(RuntimeState* state) {
-  if (materialize_tuple_) {
-    Expr::Close(sort_tuple_slot_expr_ctxs_, state);
-  }
-  Expr::Close(rhs_ordering_expr_ctxs_, state);
-  Expr::Close(lhs_ordering_expr_ctxs_, state);
-}
-
-} //namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-exec-exprs.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-exec-exprs.h b/be/src/exec/sort-exec-exprs.h
deleted file mode 100644
index 782aeeb..0000000
--- a/be/src/exec/sort-exec-exprs.h
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_SORT_EXEC_EXPRS_H
-#define IMPALA_EXEC_SORT_EXEC_EXPRS_H
-
-#include "exprs/expr.h"
-#include "runtime/runtime-state.h"
-
-namespace impala {
-
-/// Helper class to Prepare() , Open() and Close() the ordering expressions used to perform
-/// comparisons in a sort. Used by TopNNode, SortNode, and MergingExchangeNode.  When two
-/// rows are compared, the ordering expressions are evaluated once for each side.
-/// TopN and Sort materialize input rows into a single tuple before sorting.
-/// If materialize_tuple_ is true, SortExecExprs also stores the slot expressions used to
-/// materialize the sort tuples.
-class SortExecExprs {
- public:
-  /// Initialize the expressions from a TSortInfo using the specified pool.
-  Status Init(const TSortInfo& sort_info, ObjectPool* pool);
-
-  /// Initialize the ordering and (optionally) materialization expressions from the thrift
-  /// TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the tuple is not
-  /// materialized.
-  Status Init(const std::vector<TExpr>& ordering_exprs,
-    const std::vector<TExpr>* sort_tuple_slot_exprs, ObjectPool* pool);
-
-  /// Prepare all expressions used for sorting and tuple materialization.
-  Status Prepare(RuntimeState* state, const RowDescriptor& child_row_desc,
-    const RowDescriptor& output_row_desc, MemTracker* expr_mem_tracker);
-
-  /// Open all expressions used for sorting and tuple materialization.
-  Status Open(RuntimeState* state);
-
-  /// Close all expressions used for sorting and tuple materialization.
-  void Close(RuntimeState* state);
-
-  const std::vector<ExprContext*>& sort_tuple_slot_expr_ctxs() const {
-    return sort_tuple_slot_expr_ctxs_;
-  }
-
-  /// Populated in Prepare() (empty before then)
-  const std::vector<ExprContext*>& lhs_ordering_expr_ctxs() const {
-    return lhs_ordering_expr_ctxs_;
-  }
-  /// Populated in Open() (empty before then)
-  const std::vector<ExprContext*>& rhs_ordering_expr_ctxs() const {
-    return rhs_ordering_expr_ctxs_;
-  }
-
- private:
-  // Give access to testing Init()
-  friend class DataStreamTest;
-
-  /// Create two ExprContexts for evaluating over the TupleRows.
-  std::vector<ExprContext*> lhs_ordering_expr_ctxs_;
-  std::vector<ExprContext*> rhs_ordering_expr_ctxs_;
-
-  /// If true, the tuples to be sorted are materialized by
-  /// sort_tuple_slot_exprs_ before the actual sort is performed.
-  bool materialize_tuple_;
-
-  /// Expressions used to materialize slots in the tuples to be sorted.
-  /// One expr per slot in the materialized tuple. Valid only if
-  /// materialize_tuple_ is true.
-  std::vector<ExprContext*> sort_tuple_slot_expr_ctxs_;
-
-  /// Initialize directly from already-created ExprContexts. Callers should manually call
-  /// Prepare(), Open(), and Close() on input ExprContexts (instead of calling the
-  /// analogous functions in this class). Used for testing.
-  Status Init(const std::vector<ExprContext*>& lhs_ordering_expr_ctxs,
-              const std::vector<ExprContext*>& rhs_ordering_expr_ctxs);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 9660ed3..6d93130 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -16,7 +16,6 @@
 // under the License.
 
 #include "exec/sort-node.h"
-#include "exec/sort-exec-exprs.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
@@ -37,8 +36,13 @@ SortNode::~SortNode() {
 }
 
 Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  const TSortInfo& tsort_info = tnode.sort_node.sort_info;
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_));
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_,
+      state, &ordering_exprs_));
+  DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
+      child(0)->row_desc(), state, &sort_tuple_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
   return Status::OK();
@@ -47,14 +51,10 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status SortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  RETURN_IF_ERROR(sort_exec_exprs_.Prepare(
-      state, child(0)->row_desc(), row_descriptor_, expr_mem_tracker()));
-  AddExprCtxsToFree(sort_exec_exprs_);
-  less_than_.reset(new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
-  sorter_.reset(
-      new Sorter(*less_than_.get(), sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
-          &row_descriptor_, mem_tracker(), runtime_profile(), state));
-  RETURN_IF_ERROR(sorter_->Prepare());
+  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_,
+      &row_descriptor_, mem_tracker(), runtime_profile(), state));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
   AddCodegenDisabledMessage(state);
   return Status::OK();
 }
@@ -63,7 +63,6 @@ void SortNode::Codegen(RuntimeState* state) {
   DCHECK(state->ShouldCodegen());
   ExecNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-
   Status codegen_status = less_than_->Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
@@ -71,11 +70,11 @@ void SortNode::Codegen(RuntimeState* state) {
 Status SortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(sort_exec_exprs_.Open(state));
+  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
+  RETURN_IF_ERROR(sorter_->Open());
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
-  RETURN_IF_ERROR(sorter_->Open());
 
   // The child has been opened and the sorter created. Sort the input.
   // The final merge is done on-demand as rows are requested in GetNext().
@@ -135,16 +134,22 @@ Status SortNode::Reset(RuntimeState* state) {
 
 void SortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  sort_exec_exprs_.Close(state);
-  if (sorter_ != NULL) sorter_->Close();
+  if (less_than_.get() != nullptr) less_than_->Close(state);
+  if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(sort_tuple_exprs_);
   ExecNode::Close(state);
 }
 
+Status SortNode::QueryMaintenance(RuntimeState* state) {
+  sorter_->FreeLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
 void SortNode::DebugString(int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
-  *out << "SortNode("
-       << Expr::DebugString(sort_exec_exprs_.lhs_ordering_expr_ctxs());
+  *out << "SortNode(" << ScalarExpr::DebugString(ordering_exprs_);
   for (int i = 0; i < is_asc_order_.size(); ++i) {
     *out << (i > 0 ? " " : "")
          << (is_asc_order_[i] ? "asc" : "desc")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 75513ba..cbe5b68 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -19,7 +19,6 @@
 #define IMPALA_EXEC_SORT_NODE_H
 
 #include "exec/exec-node.h"
-#include "exec/sort-exec-exprs.h"
 #include "runtime/sorter.h"
 #include "runtime/buffered-block-mgr.h"
 
@@ -29,7 +28,7 @@ namespace impala {
 /// to disk if the input is larger than available memory.
 /// Uses Sorter and BufferedBlockMgr for the external sort implementation.
 /// Input rows to SortNode are materialized by the Sorter into a single tuple
-/// using the expressions specified in sort_exec_exprs_.
+/// using the expressions specified in sort_tuple_exprs_.
 /// In GetNext(), SortNode passes in the output batch to the sorter instance created
 /// in Open() to fill it with sorted rows.
 /// If a merge phase was performed in the sort, sorted rows are deep copied into
@@ -48,6 +47,7 @@ class SortNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -61,7 +61,12 @@ class SortNode : public ExecNode {
   boost::scoped_ptr<TupleRowComparator> less_than_;
 
   /// Expressions and parameters used for tuple materialization and tuple comparison.
-  SortExecExprs sort_exec_exprs_;
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Expressions used to materialize slots in the tuples to be sorted.
+  /// One expr per slot in the materialized tuple.
+  std::vector<ScalarExpr*> sort_tuple_exprs_;
+
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/subplan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 4c80125..98ef13a 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -39,22 +39,28 @@ SubplanNode::SubplanNode(ObjectPool* pool, const TPlanNode& tnode,
 Status SubplanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK_EQ(children_.size(), 2);
-  SetContainingSubplan(this, child(1));
+  RETURN_IF_ERROR(SetContainingSubplan(state, this, child(1)));
   return Status::OK();
 }
 
-void SubplanNode::SetContainingSubplan(SubplanNode* ancestor, ExecNode* node) {
+Status SubplanNode::SetContainingSubplan(
+    RuntimeState* state, SubplanNode* ancestor, ExecNode* node) {
   node->set_containing_subplan(ancestor);
   if (node->type() == TPlanNodeType::SUBPLAN_NODE) {
     // Only traverse the first child and not the second one, because the Subplan
     // parent of nodes inside it should be 'node' and not 'ancestor'.
-    SetContainingSubplan(ancestor, node->child(0));
+    RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(0)));
   } else {
+    if (node->type() == TPlanNodeType::UNNEST_NODE) {
+      UnnestNode* unnest_node = reinterpret_cast<UnnestNode*>(node);
+      RETURN_IF_ERROR(unnest_node->InitCollExpr(state));
+    }
     int num_children = node->num_children();
     for (int i = 0; i < num_children; ++i) {
-      SetContainingSubplan(ancestor, node->child(i));
+      RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(i)));
     }
   }
+  return Status::OK();
 }
 
 Status SubplanNode::Prepare(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/subplan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h
index 9aa4f2e..bf13ca1 100644
--- a/be/src/exec/subplan-node.h
+++ b/be/src/exec/subplan-node.h
@@ -19,7 +19,7 @@
 #define IMPALA_EXEC_SUBPLAN_NODE_H_
 
 #include "exec/exec-node.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 
 namespace impala {
 
@@ -62,9 +62,10 @@ class SubplanNode : public ExecNode {
   friend class UnnestNode;
 
   /// Sets 'ancestor' as the containing Subplan in all exec nodes inside the exec-node
-  /// tree rooted at 'node'.
-  /// Does not traverse the second child of SubplanNodes within 'node'
-  void SetContainingSubplan(SubplanNode* ancestor, ExecNode* node);
+  /// tree rooted at 'node' and does any initialization that is required as a result of
+  /// setting the subplan. Doesn't traverse the second child of SubplanNodes within
+  /// 'node'.
+  Status SetContainingSubplan(RuntimeState* state, SubplanNode* ancestor, ExecNode* node);
 
   /// Returns the current row from child(0) or NULL if no rows from child(0) have been
   /// retrieved yet (GetNext() has not yet been called). This function is called by

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node-ir.cc b/be/src/exec/topn-node-ir.cc
index f0eaeaa..0acd724 100644
--- a/be/src/exec/topn-node-ir.cc
+++ b/be/src/exec/topn-node-ir.cc
@@ -31,18 +31,18 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) {
 
   if (priority_queue_->size() < limit_ + offset_) {
     insert_tuple = reinterpret_cast<Tuple*>(
-        tuple_pool_->Allocate(materialized_tuple_desc_->byte_size()));
-    insert_tuple->MaterializeExprs<false, false>(input_row, *materialized_tuple_desc_,
-        sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), tuple_pool_.get());
+        tuple_pool_->Allocate(output_tuple_desc_->byte_size()));
+    insert_tuple->MaterializeExprs<false, false>(input_row, *output_tuple_desc_,
+        output_tuple_expr_evals_, tuple_pool_.get());
   } else {
     DCHECK(!priority_queue_->empty());
     Tuple* top_tuple = priority_queue_->top();
-    tmp_tuple_->MaterializeExprs<false, true>(input_row, *materialized_tuple_desc_,
-        sort_exec_exprs_.sort_tuple_slot_expr_ctxs(), nullptr);
+    tmp_tuple_->MaterializeExprs<false, true>(input_row, *output_tuple_desc_,
+        output_tuple_expr_evals_, nullptr);
     if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) {
       // TODO: DeepCopy() will allocate new buffers for the string data. This needs
       // to be fixed to use a freelist
-      tmp_tuple_->DeepCopy(top_tuple, *materialized_tuple_desc_, tuple_pool_.get());
+      tmp_tuple_->DeepCopy(top_tuple, *output_tuple_desc_, tuple_pool_.get());
       insert_tuple = top_tuple;
       priority_queue_->pop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 54bfe8f..73a8108 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -20,7 +20,7 @@
 #include <sstream>
 
 #include "codegen/llvm-codegen.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
@@ -42,7 +42,7 @@ using namespace llvm;
 TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : ExecNode(pool, tnode, descs),
     offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
-    materialized_tuple_desc_(NULL),
+    output_tuple_desc_(row_descriptor_.tuple_descriptors()[0]),
     tuple_row_less_than_(NULL),
     tmp_tuple_(NULL),
     tuple_pool_(NULL),
@@ -52,31 +52,34 @@ TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
 }
 
 Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  const TSortInfo& tsort_info = tnode.sort_node.sort_info;
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  RETURN_IF_ERROR(sort_exec_exprs_.Init(tnode.sort_node.sort_info, pool_));
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_,
+      state, &ordering_exprs_));
+  DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
+      child(0)->row_desc(), state, &output_tuple_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
-
-  DCHECK_EQ(conjunct_ctxs_.size(), 0)
+  DCHECK_EQ(conjuncts_.size(), 0)
       << "TopNNode should never have predicates to evaluate.";
-
   return Status::OK();
 }
 
 Status TopNNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  DCHECK(output_tuple_desc_ != nullptr);
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   tuple_pool_.reset(new MemPool(mem_tracker()));
-  materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
-  RETURN_IF_ERROR(sort_exec_exprs_.Prepare(
-      state, child(0)->row_desc(), row_descriptor_, expr_mem_tracker()));
-  AddExprCtxsToFree(sort_exec_exprs_);
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_,
+      expr_mem_pool(), &output_tuple_expr_evals_));
+  AddEvaluatorsToFree(output_tuple_expr_evals_);
   tuple_row_less_than_.reset(
-      new TupleRowComparator(sort_exec_exprs_, is_asc_order_, nulls_first_));
+      new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   priority_queue_.reset(
       new priority_queue<Tuple*, vector<Tuple*>, ComparatorWrapper<TupleRowComparator>>(
           *tuple_row_less_than_));
-  materialized_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
+  output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
   AddCodegenDisabledMessage(state);
   return Status::OK();
@@ -99,17 +102,17 @@ void TopNNode::Codegen(RuntimeState* state) {
 
     // Generate two MaterializeExprs() functions, one using tuple_pool_ and
     // one with no pool.
-    DCHECK(materialized_tuple_desc_ != NULL);
+    DCHECK(output_tuple_desc_ != NULL);
     Function* materialize_exprs_tuple_pool_fn;
     Function* materialize_exprs_no_pool_fn;
 
     codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
-        *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
+        *output_tuple_desc_, output_tuple_exprs_,
         true, &materialize_exprs_tuple_pool_fn);
 
     if (codegen_status.ok()) {
       codegen_status = Tuple::CodegenMaterializeExprs(codegen, false,
-          *materialized_tuple_desc_, sort_exec_exprs_.sort_tuple_slot_expr_ctxs(),
+          *output_tuple_desc_, output_tuple_exprs_,
           false, &materialize_exprs_no_pool_fn);
 
       if (codegen_status.ok()) {
@@ -134,13 +137,14 @@ void TopNNode::Codegen(RuntimeState* state) {
 Status TopNNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(tuple_row_less_than_->Open(pool_, state, expr_mem_pool()));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(output_tuple_expr_evals_, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
-  RETURN_IF_ERROR(sort_exec_exprs_.Open(state));
 
   // Allocate memory for a temporary tuple.
   tmp_tuple_ = reinterpret_cast<Tuple*>(
-      tuple_pool_->Allocate(materialized_tuple_desc_->byte_size()));
+      tuple_pool_->Allocate(output_tuple_desc_->byte_size()));
 
   RETURN_IF_ERROR(child(0)->Open(state));
 
@@ -213,11 +217,19 @@ Status TopNNode::Reset(RuntimeState* state) {
 
 void TopNNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
-  sort_exec_exprs_.Close(state);
+  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
+  if (tuple_row_less_than_.get() != nullptr) tuple_row_less_than_->Close(state);
+  ScalarExprEvaluator::Close(output_tuple_expr_evals_, state);
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(output_tuple_exprs_);
   ExecNode::Close(state);
 }
 
+Status TopNNode::QueryMaintenance(RuntimeState* state) {
+  tuple_row_less_than_->FreeLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
 // Reverse the order of the tuples in the priority queue
 void TopNNode::PrepareForOutput() {
   sorted_top_n_.resize(priority_queue_->size());
@@ -236,7 +248,7 @@ void TopNNode::PrepareForOutput() {
 void TopNNode::DebugString(int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
   *out << "TopNNode("
-      << Expr::DebugString(sort_exec_exprs_.lhs_ordering_expr_ctxs());
+       << ScalarExpr::DebugString(ordering_exprs_);
   for (int i = 0; i < is_asc_order_.size(); ++i) {
     *out << (i > 0 ? " " : "")
          << (is_asc_order_[i] ? "asc" : "desc")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/topn-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index 5bd7ded..d7daacb 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -24,7 +24,6 @@
 
 #include "codegen/impala-ir.h"
 #include "exec/exec-node.h"
-#include "exec/sort-exec-exprs.h"
 #include "runtime/descriptors.h"  // for TupleId
 #include "util/tuple-row-compare.h"
 
@@ -52,6 +51,7 @@ class TopNNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -71,14 +71,18 @@ class TopNNode : public ExecNode {
   /// Number of rows to skip.
   int64_t offset_;
 
-  /// sort_exec_exprs_ contains the ordering expressions used for tuple comparison and
-  /// the materialization exprs for the output tuple.
-  SortExecExprs sort_exec_exprs_;
+  /// Ordering expressions used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Materialization exprs for the output tuple and their evaluators.
+  std::vector<ScalarExpr*> output_tuple_exprs_;
+  std::vector<ScalarExprEvaluator*> output_tuple_expr_evals_;
+
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 
   /// Cached descriptor for the materialized tuple. Assigned in Prepare().
-  TupleDescriptor* materialized_tuple_desc_;
+  TupleDescriptor* output_tuple_desc_;
 
   /// Comparator for priority_queue_.
   boost::scoped_ptr<TupleRowComparator> tuple_row_less_than_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node-ir.cc b/be/src/exec/union-node-ir.cc
index 3d19f07..21bc205 100644
--- a/be/src/exec/union-node-ir.cc
+++ b/be/src/exec/union-node-ir.cc
@@ -20,12 +20,13 @@
 
 using namespace impala;
 
-void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(const std::vector<ExprContext*>& exprs,
-    TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch) {
+void IR_ALWAYS_INLINE UnionNode::MaterializeExprs(
+    const vector<ScalarExprEvaluator*>& evals, TupleRow* row, uint8_t* tuple_buf,
+    RowBatch* dst_batch) {
   DCHECK(!dst_batch->AtCapacity());
   Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
   TupleRow* dst_row = dst_batch->GetRow(dst_batch->AddRow());
-  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, exprs,
+  dst_tuple->MaterializeExprs<false, false>(row, *tuple_desc_, evals,
       dst_batch->tuple_data_pool());
   dst_row->SetTuple(0, dst_tuple);
   dst_batch->CommitLastRow();
@@ -37,13 +38,14 @@ void UnionNode::MaterializeBatch(RowBatch* dst_batch, uint8_t** tuple_buf) {
   RowBatch* child_batch = child_batch_.get();
   int tuple_byte_size = tuple_desc_->byte_size();
   uint8_t* cur_tuple = *tuple_buf;
-  const std::vector<ExprContext*>& child_exprs = child_expr_lists_[child_idx_];
+  const std::vector<ScalarExprEvaluator*>& child_expr_evals =
+      child_expr_evals_lists_[child_idx_];
 
   int num_rows_to_process = std::min(child_batch->num_rows() - child_row_idx_,
       dst_batch->capacity() - dst_batch->num_rows());
   FOREACH_ROW_LIMIT(child_batch, child_row_idx_, num_rows_to_process, batch_iter) {
     TupleRow* child_row = batch_iter.Get();
-    MaterializeExprs(child_exprs, child_row, cur_tuple, dst_batch);
+    MaterializeExprs(child_expr_evals, child_row, cur_tuple, dst_batch);
     cur_tuple += tuple_byte_size;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index e1912c9..d8707da 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -17,8 +17,8 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/union-node.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple.h"
@@ -35,32 +35,37 @@ UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
       tuple_id_(tnode.union_node.tuple_id),
-      tuple_desc_(nullptr),
+      tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
       first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
       child_idx_(0),
       child_batch_(nullptr),
       child_row_idx_(0),
       child_eos_(false),
-      const_expr_list_idx_(0),
+      const_exprs_lists_idx_(0),
       to_close_child_idx_(-1) { }
 
 Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   DCHECK(tnode.__isset.union_node);
-  DCHECK_EQ(conjunct_ctxs_.size(), 0);
-  // Create const_expr_ctx_lists_ from thrift exprs.
+  DCHECK_EQ(conjuncts_.size(), 0);
+  DCHECK(tuple_desc_ != nullptr);
+  // Create const_exprs_lists_ from thrift exprs.
   const vector<vector<TExpr>>& const_texpr_lists = tnode.union_node.const_expr_lists;
   for (const vector<TExpr>& texprs : const_texpr_lists) {
-    vector<ExprContext*> ctxs;
-    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs));
-    const_expr_lists_.push_back(ctxs);
+    vector<ScalarExpr*> const_exprs;
+    RETURN_IF_ERROR(ScalarExpr::Create(texprs, row_desc(), state, &const_exprs));
+    DCHECK_EQ(const_exprs.size(), tuple_desc_->slots().size());
+    const_exprs_lists_.push_back(const_exprs);
   }
-  // Create result_expr_ctx_lists_ from thrift exprs.
-  const vector<vector<TExpr>>& result_texpr_lists = tnode.union_node.result_expr_lists;
-  for (const vector<TExpr>& texprs : result_texpr_lists) {
-    vector<ExprContext*> ctxs;
-    RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, texprs, &ctxs));
-    child_expr_lists_.push_back(ctxs);
+  // Create child_exprs_lists_ from thrift exprs.
+  const vector<vector<TExpr>>& thrift_result_exprs = tnode.union_node.result_expr_lists;
+  for (int i = 0; i < thrift_result_exprs.size(); ++i) {
+    const vector<TExpr>& texprs = thrift_result_exprs[i];
+    vector<ScalarExpr*> child_exprs;
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(texprs, child(i)->row_desc(), state, &child_exprs));
+    child_exprs_lists_.push_back(child_exprs);
+    DCHECK_EQ(child_exprs.size(), tuple_desc_->slots().size());
   }
   return Status::OK();
 }
@@ -68,23 +73,25 @@ Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status UnionNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
   DCHECK(tuple_desc_ != nullptr);
-  codegend_union_materialize_batch_fns_.resize(child_expr_lists_.size());
+  codegend_union_materialize_batch_fns_.resize(child_exprs_lists_.size());
 
   // Prepare const expr lists.
-  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
-    RETURN_IF_ERROR(Expr::Prepare(exprs, state, row_desc(), expr_mem_tracker()));
-    AddExprCtxsToFree(exprs);
-    DCHECK_EQ(exprs.size(), tuple_desc_->slots().size());
+  for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
+    vector<ScalarExprEvaluator*> const_expr_evals;
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(const_exprs, state, pool_,
+        expr_mem_pool(), &const_expr_evals));
+    AddEvaluatorsToFree(const_expr_evals);
+    const_expr_evals_lists_.push_back(const_expr_evals);
   }
 
   // Prepare result expr lists.
-  for (int i = 0; i < child_expr_lists_.size(); ++i) {
-    RETURN_IF_ERROR(Expr::Prepare(
-        child_expr_lists_[i], state, child(i)->row_desc(), expr_mem_tracker()));
-    AddExprCtxsToFree(child_expr_lists_[i]);
-    DCHECK_EQ(child_expr_lists_[i].size(), tuple_desc_->slots().size());
+  for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
+    vector<ScalarExprEvaluator*> child_expr_evals;
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(child_exprs, state, pool_,
+        expr_mem_pool(), &child_expr_evals));
+    AddEvaluatorsToFree(child_expr_evals);
+    child_expr_evals_lists_.push_back(child_expr_evals);
   }
   return Status::OK();
 }
@@ -98,12 +105,12 @@ void UnionNode::Codegen(RuntimeState* state) {
   DCHECK(codegen != nullptr);
   std::stringstream codegen_message;
   Status codegen_status;
-  for (int i = 0; i < child_expr_lists_.size(); ++i) {
+  for (int i = 0; i < child_exprs_lists_.size(); ++i) {
     if (IsChildPassthrough(i)) continue;
 
     llvm::Function* tuple_materialize_exprs_fn;
     codegen_status = Tuple::CodegenMaterializeExprs(codegen, false, *tuple_desc_,
-        child_expr_lists_[i], true, &tuple_materialize_exprs_fn);
+        child_exprs_lists_[i], true, &tuple_materialize_exprs_fn);
     if (!codegen_status.ok()) {
       // Codegen may fail in some corner cases (e.g. we don't handle TYPE_CHAR). If this
       // happens, abort codegen for this and the remaining children.
@@ -137,12 +144,12 @@ Status UnionNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
   // Open const expr lists.
-  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
-    RETURN_IF_ERROR(Expr::Open(exprs, state));
+  for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state));
   }
   // Open result expr lists.
-  for (const vector<ExprContext*>& exprs : child_expr_lists_) {
-    RETURN_IF_ERROR(Expr::Open(exprs, state));
+  for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Open(evals, state));
   }
 
   // Ensures that rows are available for clients to fetch after this Open() has
@@ -247,7 +254,7 @@ Status UnionNode::GetNextMaterialized(RuntimeState* state, RowBatch* row_batch)
 
 Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(state->instance_ctx().per_fragment_instance_idx == 0 || IsInSubplan());
-  DCHECK_LT(const_expr_list_idx_, const_expr_lists_.size());
+  DCHECK_LT(const_exprs_lists_idx_, const_expr_evals_lists_.size());
   // Create new tuple buffer for row_batch.
   int64_t tuple_buf_size;
   uint8_t* tuple_buf;
@@ -255,11 +262,11 @@ Status UnionNode::GetNextConst(RuntimeState* state, RowBatch* row_batch) {
       row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buf_size, &tuple_buf));
   memset(tuple_buf, 0, tuple_buf_size);
 
-  while (const_expr_list_idx_ < const_expr_lists_.size() && !row_batch->AtCapacity()) {
+  while (const_exprs_lists_idx_ < const_exprs_lists_.size() && !row_batch->AtCapacity()) {
     MaterializeExprs(
-        const_expr_lists_[const_expr_list_idx_], nullptr, tuple_buf, row_batch);
+        const_expr_evals_lists_[const_exprs_lists_idx_], nullptr, tuple_buf, row_batch);
     tuple_buf += tuple_desc_->byte_size();
-    ++const_expr_list_idx_;
+    ++const_exprs_lists_idx_;
   }
 
   return Status::OK();
@@ -314,7 +321,7 @@ Status UnionNode::Reset(RuntimeState* state) {
   child_batch_.reset();
   child_row_idx_ = 0;
   child_eos_ = false;
-  const_expr_list_idx_ = 0;
+  const_exprs_lists_idx_ = 0;
   // Since passthrough is disabled in subplans, verify that there is no passthrough child
   // that needs to be closed.
   DCHECK_EQ(to_close_child_idx_, -1);
@@ -324,11 +331,17 @@ Status UnionNode::Reset(RuntimeState* state) {
 void UnionNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   child_batch_.reset();
-  for (const vector<ExprContext*>& exprs : const_expr_lists_) {
-    Expr::Close(exprs, state);
+  for (const vector<ScalarExprEvaluator*>& evals : const_expr_evals_lists_) {
+    ScalarExprEvaluator::Close(evals, state);
+  }
+  for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) {
+    ScalarExprEvaluator::Close(evals, state);
+  }
+  for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
+    ScalarExpr::Close(const_exprs);
   }
-  for (const vector<ExprContext*>& exprs : child_expr_lists_) {
-    Expr::Close(exprs, state);
+  for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
+    ScalarExpr::Close(child_exprs);
   }
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index 79fdfba..9c83276 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -29,7 +29,9 @@
 namespace impala {
 
 class DescriptorTbl;
-class ExprContext;
+class RuntimeState;
+class ScalarExpr;
+class ScalarExprEvaluator;
 class Tuple;
 class TupleRow;
 class TPlanNode;
@@ -67,10 +69,12 @@ class UnionNode : public ExecNode {
 
   /// Const exprs materialized by this node. These exprs don't refer to any children.
   /// Only materialized by the first fragment instance to avoid duplication.
-  std::vector<std::vector<ExprContext*>> const_expr_lists_;
+  std::vector<std::vector<ScalarExpr*>> const_exprs_lists_;
+  std::vector<std::vector<ScalarExprEvaluator*>> const_expr_evals_lists_;
 
   /// Exprs materialized by this node. The i-th result expr list refers to the i-th child.
-  std::vector<std::vector<ExprContext*>> child_expr_lists_;
+  std::vector<std::vector<ScalarExpr*>> child_exprs_lists_;
+  std::vector<std::vector<ScalarExprEvaluator*>> child_expr_evals_lists_;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
@@ -96,7 +100,7 @@ class UnionNode : public ExecNode {
   bool child_eos_;
 
   /// Index of current const result expr list.
-  int const_expr_list_idx_;
+  int const_exprs_lists_idx_;
 
   /// Index of the child that needs to be closed on the next GetNext() call. Should be set
   /// to -1 if no child needs to be closed.
@@ -126,7 +130,7 @@ class UnionNode : public ExecNode {
 
   /// Evaluates 'exprs' over 'row', materializes the results in 'tuple_buf'.
   /// and appends the new tuple to 'dst_batch'. Increments 'num_rows_returned_'.
-  void MaterializeExprs(const std::vector<ExprContext*>& exprs,
+  void MaterializeExprs(const std::vector<ScalarExprEvaluator*>& evaluators,
       TupleRow* row, uint8_t* tuple_buf, RowBatch* dst_batch);
 
   /// Returns true if the child at 'child_idx' can be passed through.
@@ -150,7 +154,7 @@ class UnionNode : public ExecNode {
   /// Returns true if there are still rows to be returned from constant expressions.
   bool HasMoreConst(const RuntimeState* state) const {
     return (state->instance_ctx().per_fragment_instance_idx == 0 || IsInSubplan()) &&
-        const_expr_list_idx_ < const_expr_lists_.size();
+        const_exprs_lists_idx_ < const_exprs_lists_.size();
   }
 
 };


Mime
View raw message