impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mjac...@apache.org
Subject [3/3] incubator-impala git commit: IMPALA-5498: Support for partial sorts in Kudu INSERTs
Date Sat, 22 Jul 2017 01:19:35 GMT
IMPALA-5498: Support for partial sorts in Kudu INSERTs

Impala currently supports total sorts (the entire set of data
is sorted) and top-n sorts (only the highest/lowest n elements
are sorted). This patch adds the ability to do partial sorts,
where the data is divided up into some number of subsets, each
of which is sorted individually.

It accomplishes this by adding a new exec node, PartialSortNode.
When PartialSortNode::GetNext() is called, it retrieves input
up to the query memory limit, uses the existing Sorter class to sort
it, and outputs it. This is faster than a total sort with SortNode
as it avoids the need to spill if the input is larger than the
memory limit.

Future work will look into setting a more restrictive memory limit
on the PartialSortNode. (IMPALA-5669)

In the planner, the SortNode plan node is used, with an enum value
indicating if it is a total or partial sort.

This also adds a new counter 'RunSize' to the runtime profile which
tracks the min, max, and avg size of the generated runs, in tuples.

As a first use case, partial sort is used where a total sort was
used previously for inserts/upserts into Kudu tables only. Future
work can extend this to other table sinks. (IMPALA-5649)

Testing:
- E2E test with a large INSERT into a Kudu table with a mem limit.
  Checks that no spills occurred.
- Updated planner tests.
- Existing E2E tests and stress test verify correctness of INSERT.
- Perf tests on the 10 node cluster: inserting tpch_100.lineitem
  into a Kudu table with mem_limit=3gb:
  Previously: 5 runs are spilled, sort took 7m33s
  Now: no spills, sort takes 6m19s, for ~18% speedup

Change-Id: Ieec2a15a0cc5240b1c13682067ab64670d1e0a38
Reviewed-on: http://gerrit.cloudera.org:8080/7267
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ad0c6e74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ad0c6e74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ad0c6e74

Branch: refs/heads/master
Commit: ad0c6e7499534d70d5b7de8e38199a9c5cfcbb48
Parents: 399b184
Author: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Authored: Thu Jun 22 12:26:48 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Sat Jul 22 00:28:36 2017 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/exec-node.cc                        |   6 +-
 be/src/exec/partial-sort-node.cc                | 172 +++++++++++++++++++
 be/src/exec/partial-sort-node.h                 | 100 +++++++++++
 be/src/exec/sort-node.h                         |   6 +-
 be/src/runtime/sorter.cc                        |  33 ++--
 be/src/runtime/sorter.h                         |  35 +++-
 be/src/util/runtime-profile-counters.h          |   2 +
 common/thrift/PlanNodes.thrift                  |  17 +-
 .../apache/impala/planner/AnalyticPlanner.java  |   3 +-
 .../java/org/apache/impala/planner/Planner.java |  16 +-
 .../impala/planner/SingleNodePlanner.java       |   9 +-
 .../org/apache/impala/planner/SortNode.java     |  97 ++++++++---
 .../queries/PlannerTest/kudu-upsert.test        |  16 +-
 .../queries/PlannerTest/kudu.test               |  12 +-
 .../queries/QueryTest/kudu_insert.test          |   4 +-
 16 files changed, 459 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 6b33753..7d86f1c 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -73,6 +73,7 @@ add_library(Exec
   parquet-column-readers.cc
   parquet-column-stats.cc
   parquet-metadata-utils.cc
+  partial-sort-node.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc
   partitioned-hash-join-builder.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 5618fef..7954660 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -42,6 +42,7 @@
 #include "exec/kudu-scan-node-mt.h"
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
+#include "exec/partial-sort-node.h"
 #include "exec/partitioned-aggregation-node.h"
 #include "exec/partitioned-hash-join-node.h"
 #include "exec/select-node.h"
@@ -330,9 +331,12 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       *node = pool->Add(new SelectNode(pool, tnode, descs));
       break;
     case TPlanNodeType::SORT_NODE:
-      if (tnode.sort_node.use_top_n) {
+      if (tnode.sort_node.type == TSortType::PARTIAL) {
+        *node = pool->Add(new PartialSortNode(pool, tnode, descs));
+      } else if (tnode.sort_node.type == TSortType::TOPN) {
         *node = pool->Add(new TopNNode(pool, tnode, descs));
       } else {
+        DCHECK(tnode.sort_node.type == TSortType::TOTAL);
         *node = pool->Add(new SortNode(pool, tnode, descs));
       }
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
new file mode 100644
index 0000000..4f485d5
--- /dev/null
+++ b/be/src/exec/partial-sort-node.cc
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partial-sort-node.h"
+
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/sorted-run-merger.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+PartialSortNode::PartialSortNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ExecNode(pool, tnode, descs),
+    sorter_(nullptr),
+    input_batch_index_(0),
+    input_eos_(false),
+    sorter_eos_(true) {}
+
+PartialSortNode::~PartialSortNode() {
+  DCHECK(input_batch_.get() == nullptr);
+}
+
+Status PartialSortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
+  DCHECK(limit_ == -1);
+  const TSortInfo& tsort_info = tnode.sort_node.sort_info;
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  RETURN_IF_ERROR(ScalarExpr::Create(
+      tsort_info.ordering_exprs, row_descriptor_, state, &ordering_exprs_));
+  DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
+      *child(0)->row_desc(), state, &sort_tuple_exprs_));
+  is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
+  nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  return Status::OK();
+}
+
+Status PartialSortNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
+      mem_tracker(), runtime_profile(), state, false));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  AddCodegenDisabledMessage(state);
+  input_batch_.reset(
+      new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
+  return Status::OK();
+}
+
+void PartialSortNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+  Status codegen_status = less_than_->Codegen(state);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status PartialSortNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  RETURN_IF_ERROR(child(0)->Open(state));
+  return Status::OK();
+}
+
+Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  DCHECK_EQ(row_batch->num_rows(), 0);
+  if (!sorter_eos_) {
+    // There were rows in the current run that didn't fit in the last output batch.
+    RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
+    if (sorter_eos_) {
+      sorter_->Reset();
+      *eos = input_eos_;
+    }
+    num_rows_returned_ += row_batch->num_rows();
+    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    return Status::OK();
+  }
+
+  if (input_eos_) {
+    *eos = true;
+    return Status::OK();
+  }
+
+  DCHECK(sorter_eos_);
+  RETURN_IF_ERROR(sorter_->Open());
+  do {
+    if (input_batch_index_ == input_batch_->num_rows()) {
+      input_batch_->Reset();
+      input_batch_index_ = 0;
+      RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), &input_eos_));
+    }
+
+    int num_processed;
+    RETURN_IF_ERROR(
+        sorter_->AddBatchNoSpill(input_batch_.get(), input_batch_index_, &num_processed));
+    input_batch_index_ += num_processed;
+    DCHECK(input_batch_index_ <= input_batch_->num_rows());
+    RETURN_IF_ERROR(QueryMaintenance(state));
+  } while (input_batch_index_ == input_batch_->num_rows() && !input_eos_);
+
+  RETURN_IF_ERROR(sorter_->InputDone());
+  RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
+  if (sorter_eos_) {
+    sorter_->Reset();
+    *eos = input_eos_;
+  }
+
+  num_rows_returned_ += row_batch->num_rows();
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status PartialSortNode::Reset(RuntimeState* state) {
+  DCHECK(false) << "PartialSortNode cannot be part of a subplan.";
+  return ExecNode::Reset(state);
+}
+
+void PartialSortNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  child(0)->Close(state);
+  if (less_than_.get() != nullptr) less_than_->Close(state);
+  if (sorter_ != nullptr) sorter_->Close(state);
+  sorter_.reset();
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(sort_tuple_exprs_);
+  input_batch_.reset();
+  ExecNode::Close(state);
+}
+
+Status PartialSortNode::QueryMaintenance(RuntimeState* state) {
+  sorter_->FreeLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
+void PartialSortNode::DebugString(int indentation_level, stringstream* out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "PartialSortNode(" << ScalarExpr::DebugString(ordering_exprs_);
+  for (int i = 0; i < is_asc_order_.size(); ++i) {
+    *out << (i > 0 ? " " : "") << (is_asc_order_[i] ? "asc" : "desc") <<
" nulls "
+         << (nulls_first_[i] ? "first" : "last");
+  }
+  ExecNode::DebugString(indentation_level, out);
+  *out << ")";
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
new file mode 100644
index 0000000..ab4c547
--- /dev/null
+++ b/be/src/exec/partial-sort-node.h
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARTIAL_SORT_NODE_H
+#define IMPALA_EXEC_PARTIAL_SORT_NODE_H
+
+#include "exec/exec-node.h"
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/sorter.h"
+
+namespace impala {
+
+/// Node that implements a partial sort, where its input is divided up into runs, each
+/// of which is sorted individually.
+///
+/// In GetNext(), PartialSortNode accepts rows up to its memory limit and sorts them,
+/// creating a single sorted run. It then outputs as many rows as fit in the output batch.
+/// Subsequent calls to GetNext() continue to ouptut rows from the sorted run until it is
+/// exhausted, at which point the next call to GetNext() will again accept rows to create
+/// another run. This means that PartialSortNode never spills to disk.
+///
+/// Uses Sorter and BufferedBlockMgr for the external sort implementation. The sorter
+/// instance owns the sorted data.
+///
+/// Input rows to PartialSortNode may consist of several tuples. The Sorter materializes
+/// them into a single tuple using the expressions specified in sort_tuple_exprs_. This
+/// single tuple is then what the sort operates on.
+///
+/// PartialSortNode does not support limits or offsets.
+class PartialSortNode : public ExecNode {
+ public:
+  PartialSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs);
+  ~PartialSortNode();
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
+  virtual Status Open(RuntimeState* state);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status Reset(RuntimeState* state);
+  virtual void Close(RuntimeState* state);
+
+ protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
+  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+
+ private:
+  /// Compares tuples according to 'ordering_exprs'.
+  boost::scoped_ptr<TupleRowComparator> less_than_;
+
+  /// Expressions and parameters used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Expressions used to materialize slots in the tuples to be sorted.
+  /// One expr per slot in the materialized tuple.
+  std::vector<ScalarExpr*> sort_tuple_exprs_;
+
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Object used for external sorting.
+  boost::scoped_ptr<Sorter> sorter_;
+
+  /// The current batch of rows retrieved from the input (the output of child(0)). This
+  /// allows us to store rows across calls to GetNext when the sorter run fills up.
+  std::unique_ptr<RowBatch> input_batch_;
+
+  /// The index in 'input_batch_' of the next row to be passed to the sorter.
+  int input_batch_index_;
+
+  /// True if the end of the input (the output of child(0)) has been reached.
+  bool input_eos_;
+
+  /// True if the current run in the sorter has been fully output. This node is done when
+  /// both 'sorter_eos_' and 'input_eos_' are true.
+  bool sorter_eos_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index cbe5b68..8b3de11 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -52,15 +52,15 @@ class SortNode : public ExecNode {
 
  private:
   /// Fetch input rows and feed them to the sorter until the input is exhausted.
-  Status SortInput(RuntimeState* state);
+  Status SortInput(RuntimeState* state) WARN_UNUSED_RESULT;
 
   /// Number of rows to skip.
   int64_t offset_;
 
-  /// The tuple row comparator derived based on 'sort_exec_exprs_'.
+  /// Compares tuples according to 'ordering_exprs'.
   boost::scoped_ptr<TupleRowComparator> less_than_;
 
-  /// Expressions and parameters used for tuple materialization and tuple comparison.
+  /// Expressions and parameters used for tuple comparison.
   std::vector<ScalarExpr*> ordering_exprs_;
 
   /// Expressions used to materialize slots in the tuples to be sorted.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 6760373..b4ef279 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1339,9 +1339,9 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple*
swap_tup
 }
 
 Sorter::Sorter(const TupleRowComparator& compare_less_than,
-    const vector<ScalarExpr*>& sort_tuple_exprs,
-    RowDescriptor* output_row_desc, MemTracker* mem_tracker,
-    RuntimeProfile* profile, RuntimeState* state)
+    const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
+    MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+    bool enable_spilling)
   : state_(state),
     compare_less_than_(compare_less_than),
     in_mem_tuple_sorter_(NULL),
@@ -1351,14 +1351,15 @@ Sorter::Sorter(const TupleRowComparator& compare_less_than,
     sort_tuple_exprs_(sort_tuple_exprs),
     mem_tracker_(mem_tracker),
     output_row_desc_(output_row_desc),
+    enable_spilling_(enable_spilling),
     unsorted_run_(NULL),
     merge_output_run_(NULL),
     profile_(profile),
     initial_runs_counter_(NULL),
     num_merges_counter_(NULL),
     in_mem_sort_timer_(NULL),
-    sorted_data_size_(NULL) {
-}
+    sorted_data_size_(NULL),
+    run_sizes_(NULL) {}
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
@@ -1379,12 +1380,15 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool)
{
   num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", TUnit::UNIT);
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
+  run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT);
 
+  // If spilling is enabled, we need enough buffers to perform merges. Otherwise, there
+  // won't be any merges and we only need 1 buffer.
   // Must be kept in sync with SortNode.computeResourceProfile() in fe.
-  int min_buffers_required = MIN_BUFFERS_PER_MERGE;
-  // Fixed and var-length blocks are separate, so we need MIN_BUFFERS_PER_MERGE
-  // blocks for both if there is var-length data.
-  if (has_var_len_slots_) min_buffers_required *= 2;
+  int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
+  // Fixed and var-length blocks are separate, so we need twice as many blocks for both if
+  // there is var-length data.
+  if (sort_tuple_desc->HasVarlenSlots()) min_buffers_required *= 2;
 
   RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
       min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
@@ -1412,10 +1416,11 @@ void Sorter::FreeLocalAllocations() {
 Status Sorter::AddBatch(RowBatch* batch) {
   DCHECK(unsorted_run_ != NULL);
   DCHECK(batch != NULL);
+  DCHECK(enable_spilling_);
   int num_processed = 0;
   int cur_batch_index = 0;
   while (cur_batch_index < batch->num_rows()) {
-    RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, cur_batch_index, &num_processed));
+    RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
 
     cur_batch_index += num_processed;
     if (cur_batch_index < batch->num_rows()) {
@@ -1430,6 +1435,12 @@ Status Sorter::AddBatch(RowBatch* batch) {
   return Status::OK();
 }
 
+Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) {
+  DCHECK(batch != nullptr);
+  RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed));
+  return Status::OK();
+}
+
 Status Sorter::InputDone() {
   // Sort the tuples in the last run.
   RETURN_IF_ERROR(SortCurrentInputRun());
@@ -1443,6 +1454,7 @@ Status Sorter::InputDone() {
     DCHECK(success) << "Should always be able to prepare pinned run for read.";
     return Status::OK();
   }
+  DCHECK(enable_spilling_);
 
   // Unpin the final run to free up memory for the merge.
   // TODO: we could keep it in memory in some circumstances as an optimisation, once
@@ -1498,6 +1510,7 @@ Status Sorter::SortCurrentInputRun() {
   }
   sorted_runs_.push_back(unsorted_run_);
   sorted_data_size_->Add(unsorted_run_->TotalBytes());
+  run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
   unsorted_run_ = NULL;
 
   RETURN_IF_CANCELLED(state_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index da3c6ef..80c5558 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -38,15 +38,19 @@ class RowBatch;
 /// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are
 /// materialized into a row with a single tuple (the sort tuple) using the materialization
 /// exprs in sort_tuple_exprs_. The sort tuples are sorted according to the sort
-/// parameters and output by the sorter.
-/// AddBatch() can be called multiple times.
+/// parameters and output by the sorter. AddBatch() can be called multiple times.
+//
+/// Callers that don't want to spill can use AddBatchNoSpill() instead, which only adds
+/// rows up to the memory limit and then returns the number of rows that were added.
+/// For this use case, 'enable_spill' should be set to false so that the sorter can reduce
+/// the number of buffers requested from the block mgr since there won't be merges.
 //
 /// InputDone() is called to indicate the end of input. If multiple sorted runs were
 /// created, it triggers intermediate merge steps (if necessary) and creates the final
 /// merger that returns results via GetNext().
 //
 /// GetNext() is used to retrieve sorted rows. It can be called multiple times.
-/// AddBatch(), InputDone() and GetNext() must be called in that order.
+/// AddBatch()/AddBatchNoSpill(), InputDone() and GetNext() must be called in that order.
 //
 /// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks
 /// called a run. The maximum size of a run is determined by the number of blocks that
@@ -92,11 +96,13 @@ class Sorter {
   /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be
   /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns true if
   /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to provide rows
-  /// to the merger and retrieve rows from an intermediate merger.
+  /// to the merger and retrieve rows from an intermediate merger. 'enable_spilling'
+  /// should be set to false to reduce the number of requested buffers if the caller will
+  /// use AddBatchNoSpill().
   Sorter(const TupleRowComparator& compare_less_than,
-      const std::vector<ScalarExpr*>& sort_tuple_exprs,
-      RowDescriptor* output_row_desc, MemTracker* mem_tracker,
-      RuntimeProfile* profile, RuntimeState* state);
+      const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
+      MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+      bool enable_spilling = true);
 
   ~Sorter();
 
@@ -109,9 +115,16 @@ class Sorter {
   /// the tuples. Must be called after Prepare() or Reset() and before calling AddBatch().
   Status Open() WARN_UNUSED_RESULT;
 
-  /// Adds a batch of input rows to the current unsorted run.
+  /// Adds the entire batch of input rows to the sorter. If the current unsorted run fills
+  /// up, it is sorted and a new unsorted run is created. Cannot be called if
+  /// 'enable_spill' is false.
   Status AddBatch(RowBatch* batch) WARN_UNUSED_RESULT;
 
+  /// Adds input rows to the current unsorted run, starting from 'start_index' up to the
+  /// memory limit. Returns the number of rows added in 'num_processed'.
+  Status AddBatchNoSpill(
+      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;
+
   /// Called to indicate there is no more input. Triggers the creation of merger(s) if
   /// necessary.
   Status InputDone() WARN_UNUSED_RESULT;
@@ -191,6 +204,9 @@ class Sorter {
   /// sorting. Not owned by the Sorter.
   RowDescriptor* output_row_desc_;
 
+  /// True if this sorter can spill. Used to determine the number of buffers to reserve.
+  bool enable_spilling_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -242,6 +258,9 @@ class Sorter {
 
   /// Total size of the initial runs in bytes.
   RuntimeProfile::Counter* sorted_data_size_;
+
+  /// Min, max, and avg size of runs in number of tuples.
+  RuntimeProfile::SummaryStatsCounter* run_sizes_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index b37235f..40f72c6 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -48,6 +48,8 @@ namespace impala {
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
+  #define ADD_SUMMARY_STATS_COUNTER(profile, name, unit) \
+      (profile)->AddSummaryStatsCounter(name, unit)
   #define ADD_CHILD_TIMER(profile, name, parent) \
       (profile)->AddCounter(name, TUnit::TIME_NS, parent)
   #define SCOPED_TIMER(c) \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index e5e7f24..c1ff302 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -346,11 +346,22 @@ struct TSortInfo {
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
 }
 
+enum TSortType {
+  // Sort the entire input.
+  TOTAL,
+
+  // Return the first N sorted elements.
+  TOPN,
+
+  // Divide the input into batches, each of which is sorted individually.
+  PARTIAL
+}
+
 struct TSortNode {
   1: required TSortInfo sort_info
-  // Indicates whether the backend service should use topn vs. sorting
-  2: required bool use_top_n;
-  // This is the number of rows to skip before returning results
+  2: required TSortType type
+  // This is the number of rows to skip before returning results.
+  // Not used with TSortType::PARTIAL.
   3: optional i64 offset
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 08dd9f5..41ff9d2 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -356,7 +356,8 @@ public class AnalyticPlanner {
       }
 
       SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
-      SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0);
+      SortNode sortNode =
+          SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, 0);
 
       // if this sort group does not have partitioning exprs, we want the sort
       // to be executed like a regular distributed sort

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index c65c668..c202094 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -547,11 +547,13 @@ public class Planner {
        Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = Lists.newArrayList();
 
+    boolean partialSort = false;
     if (insertStmt.getTargetTable() instanceof KuduTable) {
       if (!insertStmt.hasNoClusteredHint() && !ctx_.isSingleNodeExec()) {
         orderingExprs.add(
             KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
+        partialSort = true;
       }
     } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) {
       // NOTE: If the table has a 'sort.columns' property and the query has a
@@ -576,10 +578,16 @@ public class Planner {
 
     insertStmt.substituteResultExprs(smap, analyzer);
 
-    SortNode sortNode = new SortNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot(),
-        sortInfo, false, 0);
-    sortNode.init(analyzer);
+    PlanNode node = null;
+    if (partialSort) {
+      node = SortNode.createPartialSortNode(
+          ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo);
+    } else {
+      node = SortNode.createTotalSortNode(
+          ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);
+    }
+    node.init(analyzer);
 
-    inputFragment.setPlanRoot(sortNode);
+    inputFragment.setPlanRoot(node);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 3e0692b..8d82409 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -295,8 +295,13 @@ public class SingleNodePlanner {
       // TODO: External sort could be used for very large limits
       // not just unlimited order-by
       boolean useTopN = stmt.hasLimit() && !disableTopN;
-      root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
-          useTopN, stmt.getOffset());
+      if (useTopN) {
+        root = SortNode.createTopNSortNode(
+            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
+      } else {
+        root = SortNode.createTotalSortNode(
+            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
+      }
       Preconditions.checkState(root.hasValidStats());
       root.setLimit(limit);
       root.init(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index f628885..aee8fda 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -36,20 +36,27 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
+import org.apache.impala.thrift.TSortType;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Node that implements a sort with or without a limit. useTopN_ is true for sorts
- * with limits that are implemented by a TopNNode in the backend. SortNode is used
- * otherwise.
+ * Node the implements various types of sorts:
+ * - TOTAL: uses SortNode in the BE.
+ * - TOPN: uses TopNNode in the BE. Must have a limit.
+ * - PARTIAL: use PartialSortNode in the BE. Cannot have a limit or offset.
+ *
  * Will always materialize the new tuple info_.sortTupleDesc_.
  */
 public class SortNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(SortNode.class);
 
+  // Memory limit for partial sorts, specified in bytes. TODO: determine the value for
+  // this, consider making it configurable, enforce it in the BE. (IMPALA-5669)
+  private final long PARTIAL_SORT_MEM_LIMIT = 128 * 1024 * 1024;
+
   private final SortInfo info_;
 
   // if set, this SortNode requires its input to have this data partition
@@ -61,24 +68,50 @@ public class SortNode extends PlanNode {
   // info_.sortTupleSlotExprs_ substituted with the outputSmap_ for materialized slots
   // in init().
   private List<Expr> resolvedTupleExprs_;
-  private final boolean useTopN_;
+
   // The offset of the first row to return.
   protected long offset_;
 
-  public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN,
-      long offset) {
-    super(id, info.getSortTupleDescriptor().getId().asList(),
-        getDisplayName(useTopN, false));
+  // The type of sort. Determines the exec node used in the BE.
+  private TSortType type_;
+
+  /**
+   * Creates a new SortNode that implements a partial sort.
+   */
+  public static SortNode createPartialSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info) {
+    return new SortNode(id, input, info, 0, TSortType.PARTIAL);
+  }
+
+  /**
+   * Creates a new SortNode with a limit that is executed with TopNNode in the BE.
+   */
+  public static SortNode createTopNSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset) {
+    return new SortNode(id, input, info, offset, TSortType.TOPN);
+  }
+
+  /**
+   * Creates a new SortNode that does a total sort, possibly with a limit.
+   */
+  public static SortNode createTotalSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset) {
+    return new SortNode(id, input, info, offset, TSortType.TOTAL);
+  }
+
+  private SortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset, TSortType type) {
+    super(id, info.getSortTupleDescriptor().getId().asList(), getDisplayName(type));
     info_ = info;
-    useTopN_ = useTopN;
     children_.add(input);
     offset_ = offset;
+    type_ = type;
   }
 
   public long getOffset() { return offset_; }
   public void setOffset(long offset) { offset_ = offset; }
   public boolean hasOffset() { return offset_ > 0; }
-  public boolean useTopN() { return useTopN_; }
+  public boolean useTopN() { return type_ == TSortType.TOPN; }
   public SortInfo getSortInfo() { return info_; }
   public void setInputPartition(DataPartition inputPartition) {
     inputPartition_ = inputPartition;
@@ -88,7 +121,7 @@ public class SortNode extends PlanNode {
   public void setIsAnalyticSort(boolean v) { isAnalyticSort_ = v; }
 
   @Override
-  public boolean isBlockingNode() { return true; }
+  public boolean isBlockingNode() { return type_ != TSortType.PARTIAL; }
 
   @Override
   public void init(Analyzer analyzer) throws InternalException {
@@ -146,6 +179,7 @@ public class SortNode extends PlanNode {
       strings.add(isAsc ? "a" : "d");
     }
     return Objects.toStringHelper(this)
+        .add("type_", type_)
         .add("ordering_exprs", Expr.debugString(info_.getOrderingExprs()))
         .add("is_asc", "[" + Joiner.on(" ").join(strings) + "]")
         .add("nulls_first", "[" + Joiner.on(" ").join(info_.getNullsFirst()) + "]")
@@ -162,7 +196,7 @@ public class SortNode extends PlanNode {
     Preconditions.checkState(tupleIds_.size() == 1,
         "Incorrect size for tupleIds_ in SortNode");
     sort_info.setSort_tuple_slot_exprs(Expr.treesToThrift(resolvedTupleExprs_));
-    TSortNode sort_node = new TSortNode(sort_info, useTopN_);
+    TSortNode sort_node = new TSortNode(sort_info, type_);
     sort_node.setOffset(offset_);
     msg.sort_node = sort_node;
   }
@@ -218,7 +252,7 @@ public class SortNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
-    if (useTopN_) {
+    if (type_ == TSortType.TOPN) {
       long perInstanceMemEstimate =
               (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
       nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
@@ -245,22 +279,39 @@ public class SortNode extends PlanNode {
     // blocks on disk and reads from both sequences when merging. This effectively
     // doubles the block size when there are var-len columns present.
     if (hasVarLenSlots) blockSize *= 2;
-    double numInputBlocks = Math.ceil(fullInputSize / blockSize);
-    long perInstanceMemEstimate = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
 
-    // Must be kept in sync with min_buffers_required in Sorter in be.
-    long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
-    if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
-      perInstanceMinReservation *= 2;
+    if (type_ == TSortType.PARTIAL) {
+      // The memory limit cannot be less than the size of the required blocks.
+      long mem_limit =
+          PARTIAL_SORT_MEM_LIMIT > blockSize ? PARTIAL_SORT_MEM_LIMIT : blockSize;
+      // 'fullInputSize' will be negative if stats are missing, just use the limit.
+      long perInstanceMemEstimate = fullInputSize < 0 ?
+          mem_limit :
+          Math.min((long) Math.ceil(fullInputSize), mem_limit);
+      nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, blockSize);
+    } else {
+      Preconditions.checkState(type_ == TSortType.TOTAL);
+      double numInputBlocks = Math.ceil(fullInputSize / blockSize);
+      long perInstanceMemEstimate =
+          blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+
+      // Must be kept in sync with min_buffers_required in Sorter in be.
+      long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
+      if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
+        perInstanceMinReservation *= 2;
+      }
+      nodeResourceProfile_ =
+          new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
     }
-    nodeResourceProfile_ =
-        new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
   }
 
-  private static String getDisplayName(boolean isTopN, boolean isMergeOnly) {
-    if (isTopN) {
+  private static String getDisplayName(TSortType type) {
+    if (type == TSortType.TOPN) {
       return "TOP-N";
+    } else if (type == TSortType.PARTIAL) {
+      return "PARTIAL SORT";
     } else {
+      Preconditions.checkState(type == TSortType.TOTAL);
       return "SORT";
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index 2bc5df7..c538e57 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -10,7 +10,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(bigint_col))]
@@ -51,7 +51,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-08:SORT
+08:PARTIAL SORT
 |  order by: KuduPartition(a.bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
 |
 07:EXCHANGE [KUDU(KuduPartition(a.bigint_col))]
@@ -97,7 +97,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -125,7 +125,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(functional_kudu.testtbl.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(functional_kudu.testtbl.id))]
@@ -148,7 +148,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-04:SORT
+04:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 03:EXCHANGE [KUDU(KuduPartition(id))]
@@ -175,7 +175,7 @@ UPSERT INTO KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -195,7 +195,7 @@ upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
@@ -216,7 +216,7 @@ upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 16cb3a9..436aa51 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -50,7 +50,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(10) ASC NULLS LAST, 10 ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(10))]
@@ -66,7 +66,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(int_col) ASC NULLS LAST, int_col ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(int_col))]
@@ -90,7 +90,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-06:SORT
+06:PARTIAL SORT
 |  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
 |
 05:EXCHANGE [KUDU(KuduPartition(count(id)))]
@@ -264,7 +264,7 @@ INSERT INTO KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(functional_kudu.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(functional_kudu.alltypes.id))]
@@ -288,7 +288,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -394,7 +394,7 @@ insert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 71b09fc..76ad779 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -435,4 +435,6 @@ NumRowErrors: 1
 set mem_limit=400m;
 create table kudu_test primary key(a, b) partition by hash(a, b) partitions 8 stored as kudu
as
 select l_orderkey a, concat(l_comment, l_comment, l_comment) b from tpch.lineitem
-====
\ No newline at end of file
+---- RUNTIME_PROFILE
+row_regex: .*SpilledRuns: 0 \(0\)
+====



Mime
View raw message