impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [4/4] incubator-impala git commit: IMPALA-3208: max_row_size option
Date Wed, 23 Aug 2017 03:29:01 GMT
IMPALA-3208: max_row_size option

Adds support for a "max_row_size" query option that instructs Impala
to reserve enough memory to process rows of the specified size. For
spilling operators, the planner reserves enough memory to process
rows of this size. The advantage of this compared to simply
specifying larger values for min_spillable_buffer_size and
default_spillable_buffer_size is that operators may be able to
handler larger rows without increasing the size of all their
buffers.

The default value is 512KB. I picked that number because it doesn't
increase minimum reservations *too* much even with smaller buffers
like 64kb but should be large enough for almost all reasonable
workloads.

This is implemented in the aggs and joins using the variable page size
support added to BufferedTupleStream in an earlier commit. The synopsis
is that each stream requires reservation for one default-sized page
per read and write iterator, and temporarily requires reservation
for a max-sized page when reading or writing larger pages. The
max-sized write reservation is released immediately after the row
is appended and the max-size read reservation is released after
advancing to the next row.

The sorter and analytic simply use max-sized buffers for all pages
in the stream.

Testing:
Updated existing planner tests to reflect default max_row_size. Added
new planner tests to test the effect of the query option.

Added "set" test to check validation of query option.

Added end-to-end tests exercising spilling operators with large rows
with and without spilling induced by SET_DENY_RESERVATION_PROBABILITY.

Change-Id: Ic70f6dddbcef124bb4b329ffa2e42a74a1826570
Reviewed-on: http://gerrit.cloudera.org:8080/7629
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ed87c406
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ed87c406
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ed87c406

Branch: refs/heads/master
Commit: ed87c4060017399b3e0ee144347a38b14a4b1794
Parents: c8f531b
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Aug 9 12:24:54 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Aug 23 03:27:26 2017 +0000

----------------------------------------------------------------------
 be/src/exec/partitioned-aggregation-node.cc     |  11 +-
 be/src/exec/partitioned-aggregation-node.h      |  26 +-
 be/src/exec/partitioned-hash-join-builder.cc    |   7 +-
 be/src/exec/partitioned-hash-join-builder.h     |  13 +-
 be/src/exec/partitioned-hash-join-node.cc       |   9 +-
 be/src/runtime/buffered-tuple-stream.cc         |   2 +-
 be/src/runtime/sorter.cc                        |   5 +-
 be/src/service/query-options.cc                 |  14 +-
 be/src/service/query-options.h                  |   3 +-
 common/thrift/ImpalaInternalService.thrift      |  10 +
 common/thrift/ImpalaService.thrift              |   3 +
 common/thrift/PlanNodes.thrift                  |   4 +
 common/thrift/generate_error_codes.py           |   3 +-
 .../apache/impala/planner/AggregationNode.java  |  22 +-
 .../apache/impala/planner/AnalyticEvalNode.java |  12 +-
 .../org/apache/impala/planner/HashJoinNode.java |  16 +-
 .../org/apache/impala/planner/PlanNode.java     |  11 +-
 .../java/org/apache/impala/planner/Planner.java |   3 +-
 .../apache/impala/planner/ResourceProfile.java  |  53 +--
 .../impala/planner/ResourceProfileBuilder.java  |  69 ++++
 .../org/apache/impala/planner/SortNode.java     |  12 +-
 .../org/apache/impala/planner/PlannerTest.java  |  11 +
 .../queries/PlannerTest/constant-folding.test   |  18 +-
 .../queries/PlannerTest/disable-codegen.test    |   4 +-
 .../PlannerTest/fk-pk-join-detection.test       |  24 +-
 .../queries/PlannerTest/max-row-size.test       | 355 +++++++++++++++++++
 .../queries/PlannerTest/mt-dop-validation.test  |   8 +-
 .../PlannerTest/resource-requirements.test      |  82 ++---
 .../PlannerTest/spillable-buffer-sizing.test    |  28 +-
 .../queries/PlannerTest/tablesample.test        |   4 +-
 .../functional-query/queries/QueryTest/set.test |  31 ++
 .../queries/QueryTest/spilling-aggs.test        |   1 +
 .../queries/QueryTest/spilling-large-rows.test  | 188 ++++++++++
 tests/query_test/test_spilling.py               |   5 +
 34 files changed, 902 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 3949041..a0fed41 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -253,8 +253,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
   // Claim reservation after the child has been opened to reduce the peak reservation
   // requirement.
   if (!buffer_pool_client_.is_registered() && !grouping_exprs_.empty()) {
-    DCHECK_GE(resource_profile_.min_reservation,
-        resource_profile_.spillable_buffer_size * MinRequiredBuffers());
+    DCHECK_GE(resource_profile_.min_reservation, MinReservation());
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
 
@@ -277,7 +276,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
       if (!is_streaming_preagg_ && needs_serialize_) {
         serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_,
             &buffer_pool_client_, resource_profile_.spillable_buffer_size,
-            resource_profile_.spillable_buffer_size));
+            resource_profile_.max_row_buffer_size));
         RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
         bool got_buffer;
         // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
@@ -725,7 +724,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
   aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
       &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
       parent->resource_profile_.spillable_buffer_size,
-      parent->resource_profile_.spillable_buffer_size, external_varlen_slots));
+      parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
   RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
   bool got_buffer;
   RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -743,7 +742,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() {
     unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
         parent->child(0)->row_desc(), &parent->buffer_pool_client_,
         parent->resource_profile_.spillable_buffer_size,
-        parent->resource_profile_.spillable_buffer_size));
+        parent->resource_profile_.max_row_buffer_size));
     // This stream is only used to spill, no need to ever have this pinned.
     RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
     // Save memory by waiting until we spill to allocate the write buffer for the
@@ -814,7 +813,7 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
         &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
         parent->resource_profile_.spillable_buffer_size,
-        parent->resource_profile_.spillable_buffer_size));
+        parent->resource_profile_.max_row_buffer_size));
     status = parent->serialize_stream_->Init(parent->id(), false);
     if (status.ok()) {
       bool got_buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index c230630..210400e 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -80,11 +80,13 @@ class SlotDescriptor;
 /// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
 ///    Rows in this stream always have child(0)'s layout.
 ///
-/// Buffering: Each stream and hash table needs to maintain at least one buffer for
-/// some duration of the processing. To minimize the memory requirements of small queries
-/// (i.e. memory usage is less than one IO-buffer per partition), the streams and hash
-/// tables of each partition start using small (less than IO-sized) buffers, regardless
-/// of the level.
+/// Buffering: Each stream and hash table needs to maintain at least one buffer when
+/// it is being read or written. The streams for a given agg use a uniform buffer size,
+/// except when processing rows larger than that buffer size. In that case, the agg uses
+/// BufferedTupleStream's variable buffer size support to handle larger rows up to the
+/// maximum row size. Only two max-sized buffers are needed for the agg to spill: one
+/// to hold rows being read from a spilled input stream and another for a temporary write
+/// buffer when adding a row to an output stream.
 ///
 /// Two-phase aggregation: we support two-phase distributed aggregations, where
 /// pre-aggregrations attempt to reduce the size of data before shuffling data across the
@@ -719,18 +721,24 @@ class PartitionedAggregationNode : public ExecNode {
   Status CodegenProcessBatchStreaming(
       LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 
-  /// Compute minimum buffer requirement for grouping aggregations.
+  /// Compute minimum buffer reservation for grouping aggregations.
   /// We need one buffer per partition, which is used either as the write buffer for the
   /// aggregated stream or the unaggregated stream. We need an additional buffer to read
-  /// the stream we are currently repartitioning.
+  /// the stream we are currently repartitioning. The read buffer needs to be a max-sized
+  /// buffer to hold a max-sized row and we need one max-sized write buffer that is used
+  /// temporarily to append a row to any stream.
+  ///
   /// If we need to serialize, we need an additional buffer while spilling a partition
   /// as the partitions aggregate stream needs to be serialized and rewritten.
   /// We do not spill streaming preaggregations, so we do not need to reserve any buffers.
-  int MinRequiredBuffers() const {
+  int64_t MinReservation() const {
     DCHECK(!grouping_exprs_.empty());
     // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe.
     if (is_streaming_preagg_) return 0; // Need 0 buffers to pass through rows.
-    return PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
+    int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
+    // Two of the buffers must fit the maximum row.
+    return resource_profile_.spillable_buffer_size * (num_buffers - 2) +
+          resource_profile_.max_row_buffer_size * 2;
   }
 };
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 2dc2d8d..46afb33 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -55,7 +55,7 @@ using strings::Substitute;
 PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
     const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
     RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
-    int64_t spillable_buffer_size)
+    int64_t spillable_buffer_size, int64_t max_row_buffer_size)
   : DataSink(build_row_desc),
     runtime_state_(state),
     join_node_id_(join_node_id),
@@ -63,6 +63,7 @@ PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
     probe_row_desc_(probe_row_desc),
     buffer_pool_client_(buffer_pool_client),
     spillable_buffer_size_(spillable_buffer_size),
+    max_row_buffer_size_(max_row_buffer_size),
     non_empty_build_(false),
     partitions_created_(NULL),
     largest_partition_percent_(NULL),
@@ -424,7 +425,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
     // Create stream in vector, so that it will be cleaned up after any failure.
     spilled_partition_probe_streams_.emplace_back(
         make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_,
-            buffer_pool_client_, spillable_buffer_size_, spillable_buffer_size_));
+            buffer_pool_client_, spillable_buffer_size_, max_row_buffer_size_));
     BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
     RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false));
 
@@ -575,7 +576,7 @@ PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int le
   : parent_(parent), is_spilled_(false), level_(level) {
   build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_,
       parent_->buffer_pool_client_, parent->spillable_buffer_size_,
-      parent->spillable_buffer_size_);
+      parent->max_row_buffer_size_);
 }
 
 PhjBuilder::Partition::~Partition() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 277579c..51bcd63 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -74,7 +74,8 @@ class PhjBuilder : public DataSink {
 
   PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
       const RowDescriptor* build_row_desc, RuntimeState* state,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size);
+      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+      int64_t max_row_buffer_size);
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -242,7 +243,7 @@ class PhjBuilder : public DataSink {
     std::unique_ptr<BufferedTupleStream> build_rows_;
   };
 
-  /// Computes the minimum number of buffers required to execute the spilling partitioned
+  /// Computes the minimum reservation required to execute the spilling partitioned
   /// hash algorithm successfully for any input size (assuming enough disk space is
   /// available for spilled rows). The buffers are used for buffering both build and
   /// probe rows at different times, so the total requirement is the peak sum of build
@@ -251,11 +252,12 @@ class PhjBuilder : public DataSink {
   /// need one additional buffer for the input while repartitioning the build or probe.
   /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
   /// 'null_aware_probe_partition_' and 'null_probe_rows_'.
-  int MinRequiredBuffers() const {
+  int64_t MinReservation() const {
     // Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe.
     int num_reserved_buffers = PARTITION_FANOUT + 1;
     if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
-    return num_reserved_buffers;
+    // Two of the buffers must fit the maximum row.
+    return spillable_buffer_size_ * (num_reserved_buffers - 2) + max_row_buffer_size_ * 2;
   }
 
  protected:
@@ -371,8 +373,9 @@ class PhjBuilder : public DataSink {
   /// 1:N relationship from builders to join nodes.
   BufferPool::ClientHandle* buffer_pool_client_;
 
-  /// The size of buffers to use in the build and probe streams.
+  /// The default and max buffer sizes to use in the build and probe streams.
   const int64_t spillable_buffer_size_;
+  const int64_t max_row_buffer_size_;
 
   /// Allocator for hash table memory.
   boost::scoped_ptr<Suballocator> ht_allocator_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 806bdc0..2c656b0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -84,7 +84,7 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   // being separated out further.
   builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(),
         child(1)->row_desc(), state, &buffer_pool_client_,
-        resource_profile_.spillable_buffer_size));
+        resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
   RETURN_IF_ERROR(
       builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
 
@@ -183,8 +183,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
 }
 
 Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
-  DCHECK_GE(resource_profile_.min_reservation,
-      resource_profile_.spillable_buffer_size * builder_->MinRequiredBuffers());
+  DCHECK_GE(resource_profile_.min_reservation, builder_->MinReservation());
   if (!buffer_pool_client_.is_registered()) {
     RETURN_IF_ERROR(ClaimBufferReservation(state));
   }
@@ -825,7 +824,7 @@ Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   unique_ptr<BufferedTupleStream> probe_rows = make_unique<BufferedTupleStream>(
       state, child(0)->row_desc(), &buffer_pool_client_,
       resource_profile_.spillable_buffer_size,
-      resource_profile_.spillable_buffer_size);
+      resource_profile_.max_row_buffer_size);
   // TODO: this should be pinned if spilling is disabled.
   Status status = probe_rows->Init(id(), false);
   if (!status.ok()) goto error;
@@ -849,7 +848,7 @@ Status PartitionedHashJoinNode::InitNullProbeRows() {
   RuntimeState* state = runtime_state_;
   null_probe_rows_ = make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
       &buffer_pool_client_, resource_profile_.spillable_buffer_size,
-      resource_profile_.spillable_buffer_size);
+      resource_profile_.max_row_buffer_size);
   // TODO: we shouldn't start with this unpinned if spilling is disabled.
   RETURN_IF_ERROR(null_probe_rows_->Init(id(), false));
   bool got_buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 38dc44c..e0a14bb 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -383,7 +383,7 @@ Status BufferedTupleStream::CalcPageLenForRow(int64_t row_size, int64_t* page_le
   if (UNLIKELY(row_size > max_page_len_)) {
     return Status(TErrorCode::MAX_ROW_SIZE,
         PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_,
-        PrettyPrinter::Print(0, TUnit::BYTES));
+        PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES));
   }
   *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index de05945..d2e6bb2 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -700,9 +700,10 @@ Status Sorter::Run::AddBatchInternal(
             *sort_tuple_desc_, sorter_->sort_tuple_expr_evals_, NULL,
             &string_values, &total_var_len);
         if (total_var_len > sorter_->page_len_) {
+          int64_t max_row_size = sorter_->state_->query_options().max_row_size;
           return Status(TErrorCode::MAX_ROW_SIZE,
               PrettyPrinter::Print(total_var_len, TUnit::BYTES), sorter_->node_id_,
-              PrettyPrinter::Print(0, TUnit::BYTES));
+              PrettyPrinter::Print(max_row_size, TUnit::BYTES));
         }
       } else {
         memcpy(new_tuple, input_row->GetTuple(0), sort_tuple_size_);
@@ -1509,7 +1510,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) {
   if (sort_tuple_desc->byte_size() > page_len_) {
     return Status(TErrorCode::MAX_ROW_SIZE,
         PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES), node_id_,
-        PrettyPrinter::Print(0, TUnit::BYTES));
+        PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES));
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
   in_mem_tuple_sorter_.reset(new TupleSorter(compare_less_than_, page_len_,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c123902..21327f3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -508,7 +508,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::DEFAULT_SPILLABLE_BUFFER_SIZE: {
         int64_t buffer_size_bytes;
         RETURN_IF_ERROR(
-            ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes));
+            ParseMemValue(value, "Default spillable buffer size", &buffer_size_bytes));
         if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
           return Status(
               Substitute("Buffer size must be a power of two: $0", buffer_size_bytes));
@@ -519,7 +519,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE: {
         int64_t buffer_size_bytes;
         RETURN_IF_ERROR(
-            ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes));
+            ParseMemValue(value, "Minimum spillable buffer size", &buffer_size_bytes));
         if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
           return Status(
               Substitute("Buffer size must be a power of two: $0", buffer_size_bytes));
@@ -527,6 +527,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_min_spillable_buffer_size(buffer_size_bytes);
         break;
       }
+      case TImpalaQueryOptions::MAX_ROW_SIZE: {
+        int64_t max_row_size_bytes;
+        RETURN_IF_ERROR(ParseMemValue(value, "Max row size", &max_row_size_bytes));
+        if (max_row_size_bytes <= 0) {
+          return Status(Substitute(
+              "Max row size must be a positive number of bytes: $0", value));
+        }
+        query_options->__set_max_row_size(max_row_size_bytes);
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 8d6af02..bb8c301 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE + 1);\
+      TImpalaQueryOptions::MAX_ROW_SIZE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -95,6 +95,7 @@ class TQueryOptions;
   QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD)\
   QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE)\
   QUERY_OPT_FN(min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE)\
+  QUERY_OPT_FN(max_row_size, MAX_ROW_SIZE)\
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index b094d3e..54fcdaf 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -263,6 +263,16 @@ struct TQueryOptions {
   // The minimum spillable buffer to use. The planner will not choose a size smaller than
   // this. Defaults to 64KB.
   59: optional i64 min_spillable_buffer_size = 65536;
+
+  // The maximum size of row that the query will reserve memory to process. Processing
+  // rows larger than this may result in a query failure. Defaults to 512KB, e.g.
+  // enough for a row with 15 32KB strings or many smaller columns.
+  //
+  // Different operators handle this option in different ways. E.g. some simply increase
+  // the size of all their buffers to fit this row size, whereas others may use more
+  // sophisticated strategies - e.g. reserving a small number of buffers large enough to
+  // fit maximum-sized rows.
+  60: optional i64 max_row_size = 524288;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index ced884b..b8a073e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -285,6 +285,9 @@ enum TImpalaQueryOptions {
 
   // The minimum spillable buffer size, in bytes.
   MIN_SPILLABLE_BUFFER_SIZE,
+
+  // The maximum row size that memory is reserved for, in bytes.
+  MAX_ROW_SIZE,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index ae9faa9..c04d08a 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -497,6 +497,10 @@ struct TBackendResourceProfile {
   // The spillable buffer size in bytes to use for this node, chosen by the planner.
   // Set iff the node uses spillable buffers.
   3: optional i64 spillable_buffer_size
+
+  // The buffer size in bytes that is large enough to fit the largest row to be processed.
+  // Set if the node allocates buffers for rows from the buffer pool.
+  4: optional i64 max_row_buffer_size
 }
 
 // This is essentially a union of all messages corresponding to subclasses

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5a72d80..32a54ca 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -318,9 +318,8 @@ error_codes = (
    "Kudu table '$0' column '$1' contains an out of range timestamp. "
    "The valid date range is 1400-01-01..9999-12-31."),
 
-  # TODO: IMPALA-3200: make sure that this references the correct query option.
   ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node with "
-    "id $1. Increase the <TBD> query option (currently $2) to process larger rows."),
+    "id $1. Increase the max_row_size query option (currently $2) to process larger rows."),
 
   ("IR_VERIFY_FAILED", 105,
    "Failed to verify generated IR function $0, see log for more details."),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index c938f76..ce725e0 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -299,11 +299,13 @@ public class AggregationNode extends PlanNode {
           PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
     }
 
-    // Must be kept in sync with PartitionedAggregationNode::MinRequiredBuffers() in be.
-    long perInstanceMinBuffers;
+    // Must be kept in sync with PartitionedAggregationNode::MinReservation() in be.
+    long perInstanceMinReservation;
     long bufferSize = queryOptions.getDefault_spillable_buffer_size();
+    long maxRowBufferSize =
+        computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
     if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) {
-      perInstanceMinBuffers = 0;
+      perInstanceMinReservation = 0;
     } else {
       final int PARTITION_FANOUT = 16;
       long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0);
@@ -314,11 +316,19 @@ public class AggregationNode extends PlanNode {
         bufferSize = Math.min(bufferSize, Math.max(
             queryOptions.getMin_spillable_buffer_size(),
             BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
+        // Recompute the max row buffer size with the smaller buffer.
+        maxRowBufferSize =
+            computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
       }
-      perInstanceMinBuffers = bufferSize * minBuffers;
+      // Two of the buffers need to be buffers large enough to hold the maximum-sized row
+      // to serve as input and output buffers while repartitioning.
+      perInstanceMinReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
     }
 
-    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
-        perInstanceMemEstimate, perInstanceMinBuffers, bufferSize);
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(perInstanceMemEstimate)
+        .setMinReservationBytes(perInstanceMinReservation)
+        .setSpillableBufferBytes(bufferSize)
+        .setMaxRowBufferBytes(maxRowBufferSize).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 0322d88..1ce4884 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -248,11 +248,15 @@ public class AnalyticEvalNode extends PlanNode {
     // TODO: come up with estimate based on window
     long perInstanceMemEstimate = 0;
 
-    // Analytic always uses the default spillable buffer size.
-    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
+    // Analytic uses a single buffer size.
+    long bufferSize = computeMaxSpillableBufferSize(
+        queryOptions.getDefault_spillable_buffer_size(), queryOptions.getMax_row_size());
+
     // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in be.
     long perInstanceMinBufferBytes = 2 * bufferSize;
-    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
-        perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize);
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(perInstanceMemEstimate)
+        .setMinReservationBytes(perInstanceMinBufferBytes)
+        .setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 5ff17c5..917d918 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -217,7 +217,7 @@ public class HashJoinNode extends JoinNode {
           perInstanceDataBytes * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
     }
 
-    // Must be kept in sync with PartitionedHashJoinBuilder::MinRequiredBuffers() in be.
+    // Must be kept in sync with PartitionedHashJoinBuilder::MinReservation() in be.
     final int PARTITION_FANOUT = 16;
     long minBuffers = PARTITION_FANOUT + 1
         + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0);
@@ -232,8 +232,16 @@ public class HashJoinNode extends JoinNode {
           BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
     }
 
-    long perInstanceMinBufferBytes = bufferSize * minBuffers;
-    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
-        perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize);
+    // Two of the buffers need to be buffers large enough to hold the maximum-sized row
+    // to serve as input and output buffers while repartitioning.
+    long maxRowBufferSize =
+        computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
+    long perInstanceMinBufferBytes =
+        bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(perInstanceMemEstimate)
+        .setMinReservationBytes(perInstanceMinBufferBytes)
+        .setSpillableBufferBytes(bufferSize)
+        .setMaxRowBufferBytes(maxRowBufferSize).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 2557f98..c149b5c 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -33,12 +33,12 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
-import org.apache.impala.thrift.TBackendResourceProfile;
 import org.apache.impala.thrift.TExecStats;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlan;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.BitUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -679,6 +679,15 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   }
 
   /**
+   * Compute the buffer size that will be used to fit the maximum-sized row - either the
+   * default buffer size provided or the smallest buffer that fits a maximum-sized row.
+   */
+  protected static long computeMaxSpillableBufferSize(long defaultBufferSize,
+      long maxRowSize) {
+    return Math.max(defaultBufferSize, BitUtil.roundUpToPowerOf2(maxRowSize));
+  }
+
+  /**
    * The input cardinality is the sum of output cardinalities of its children.
    * For scan nodes the input cardinality is the expected number of rows scanned.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 1491873..7992b7b 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -64,7 +64,8 @@ public class Planner {
   public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
 
   public static final ResourceProfile MIN_PER_HOST_RESOURCES =
-      ResourceProfile.withMinReservation(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0);
+      new ResourceProfileBuilder().setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
+      .setMinReservationBytes(0).build();
 
   private final PlannerContext ctx_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
index d2d0f70..0494078 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -21,6 +21,9 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.thrift.TBackendResourceProfile;
 import org.apache.impala.util.MathUtil;
 
+import com.google.common.base.Preconditions;
+import com.google.common.math.LongMath;
+
 /**
  * The resources that will be consumed by some part of a plan, e.g. a plan node or
  * plan fragment.
@@ -44,43 +47,39 @@ public class ResourceProfile {
   // The valid range is [minReservationBytes_, Long.MAX_VALUE].
   private final long maxReservationBytes_;
 
-  // The spillable buffer size to use in a plan node. Only valid for resource profiles
-  // for spilling PlanNodes. Operations like sum(), max(), etc., produce profiles without
-  // valid spillableBufferBytes_ values. -1 means invalid.
+  // The default spillable buffer size to use in a plan node. Only valid for resource
+  // profiles for spilling PlanNodes. Operations like sum(), max(), etc., produce
+  // profiles without valid spillableBufferBytes_ values. -1 means invalid.
   private final long spillableBufferBytes_;
 
-  private ResourceProfile(boolean isValid, long memEstimateBytes,
-      long minReservationBytes, long maxReservationBytes, long spillableBufferBytes) {
+  // The buffer size to use for max-sized row in a plan node. -1 means invalid.
+  // Must be set to a valid power-of-two value if spillableBufferBytes_ is set.
+  private final long maxRowBufferBytes_;
+
+  ResourceProfile(boolean isValid, long memEstimateBytes,
+      long minReservationBytes, long maxReservationBytes, long spillableBufferBytes,
+      long maxRowBufferBytes) {
+    Preconditions.checkArgument(spillableBufferBytes == -1 || maxRowBufferBytes != -1);
+    Preconditions.checkArgument(spillableBufferBytes == -1
+        || LongMath.isPowerOfTwo(spillableBufferBytes));
+    Preconditions.checkArgument(maxRowBufferBytes == -1
+        || LongMath.isPowerOfTwo(maxRowBufferBytes));
     isValid_ = isValid;
     memEstimateBytes_ = (minReservationBytes != -1) ?
         Math.max(memEstimateBytes, minReservationBytes) : memEstimateBytes;
     minReservationBytes_ = minReservationBytes;
     maxReservationBytes_ = maxReservationBytes;
     spillableBufferBytes_ = spillableBufferBytes;
+    maxRowBufferBytes_ = maxRowBufferBytes;
   }
 
   // Create a resource profile with zero min or max reservation.
   public static ResourceProfile noReservation(long memEstimateBytes) {
-    return new ResourceProfile(true, memEstimateBytes, 0, 0, -1);
-  }
-
-  // Create a resource profile with a minimum reservation (but no maximum).
-  public static ResourceProfile withMinReservation(long memEstimateBytes,
-      long minReservationBytes) {
-    return new ResourceProfile(
-        true, memEstimateBytes, minReservationBytes, Long.MAX_VALUE, -1);
-  }
-
-  // Create a resource profile with a minimum reservation (but no maximum) and a
-  // spillable buffer size.
-  public static ResourceProfile spillableWithMinReservation(long memEstimateBytes,
-      long minReservationBytes, long spillableBufferBytes) {
-    return new ResourceProfile(true, memEstimateBytes, minReservationBytes,
-        Long.MAX_VALUE, spillableBufferBytes);
+    return new ResourceProfile(true, memEstimateBytes, 0, 0, -1, -1);
   }
 
   public static ResourceProfile invalid() {
-    return new ResourceProfile(false, -1, -1, -1, -1);
+    return new ResourceProfile(false, -1, -1, -1, -1, -1);
   }
 
   public boolean isValid() { return isValid_; }
@@ -88,6 +87,7 @@ public class ResourceProfile {
   public long getMinReservationBytes() { return minReservationBytes_; }
   public long getMaxReservationBytes() { return maxReservationBytes_; }
   public long getSpillableBufferBytes() { return spillableBufferBytes_; }
+  public long getMaxRowBufferBytes() { return maxRowBufferBytes_; }
 
   // Return a string with the resource profile information suitable for display in an
   // explain plan in a format like: "resource1=value resource2=value"
@@ -113,7 +113,7 @@ public class ResourceProfile {
     return new ResourceProfile(true,
         Math.max(getMemEstimateBytes(), other.getMemEstimateBytes()),
         Math.max(getMinReservationBytes(), other.getMinReservationBytes()),
-        Math.max(getMaxReservationBytes(), other.getMaxReservationBytes()), -1);
+        Math.max(getMaxReservationBytes(), other.getMaxReservationBytes()), -1, -1);
   }
 
   // Returns a profile with the sum of each value in 'this' and 'other'.
@@ -124,7 +124,7 @@ public class ResourceProfile {
         MathUtil.saturatingAdd(getMemEstimateBytes(), other.getMemEstimateBytes()),
         MathUtil.saturatingAdd(getMinReservationBytes(),other.getMinReservationBytes()),
         MathUtil.saturatingAdd(getMaxReservationBytes(), other.getMaxReservationBytes()),
-        -1);
+        -1, -1);
   }
 
   // Returns a profile with all values multiplied by 'factor'.
@@ -133,7 +133,7 @@ public class ResourceProfile {
     return new ResourceProfile(true,
         MathUtil.saturatingMultiply(memEstimateBytes_, factor),
         MathUtil.saturatingMultiply(minReservationBytes_, factor),
-        MathUtil.saturatingMultiply(maxReservationBytes_, factor), -1);
+        MathUtil.saturatingMultiply(maxReservationBytes_, factor), -1, -1);
   }
 
   public TBackendResourceProfile toThrift() {
@@ -143,6 +143,9 @@ public class ResourceProfile {
     if (spillableBufferBytes_ != -1) {
       result.setSpillable_buffer_size(spillableBufferBytes_);
     }
+    if (maxRowBufferBytes_ != -1) {
+      result.setMax_row_buffer_size(maxRowBufferBytes_);
+    }
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
new file mode 100644
index 0000000..5420200
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class to help set up the various parameters of a ResourceProfile.
+ */
+public class ResourceProfileBuilder {
+
+  // Must be set by caller.
+  private long memEstimateBytes_ = -1;
+
+  // Assume no reservation is used unless the caller explicitly sets it.
+  private long minReservationBytes_ = 0;
+  private long maxReservationBytes_ = 0;
+
+  // The spillable buffer size is only set by plan nodes that use it.
+  private long spillableBufferBytes_= -1;
+
+  // Must be set if spillableBufferBytes_ is set.
+  private long maxRowBufferBytes_= -1;
+
+  public ResourceProfileBuilder setMemEstimateBytes(long memEstimateBytes) {
+    memEstimateBytes_ = memEstimateBytes;
+    return this;
+  }
+
+  /**
+   * Sets the minimum reservation and an unbounded maximum reservation.
+   */
+  public ResourceProfileBuilder setMinReservationBytes(long minReservationBytes) {
+    minReservationBytes_ = minReservationBytes;
+    maxReservationBytes_ = Long.MAX_VALUE;
+    return this;
+  }
+
+  public ResourceProfileBuilder setSpillableBufferBytes(long spillableBufferBytes) {
+    spillableBufferBytes_ = spillableBufferBytes;
+    return this;
+  }
+
+  public ResourceProfileBuilder setMaxRowBufferBytes(long maxRowBufferBytes) {
+    maxRowBufferBytes_ = maxRowBufferBytes;
+    return this;
+  }
+
+  ResourceProfile build() {
+    Preconditions.checkState(memEstimateBytes_ >= 0, "Mem estimate must be set");
+    return new ResourceProfile(true, memEstimateBytes_, minReservationBytes_,
+        maxReservationBytes_, spillableBufferBytes_, maxRowBufferBytes_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 75e8034..3ca50e0 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -272,8 +272,10 @@ public class SortNode extends PlanNode {
       }
     }
 
-    // Sort always uses the default spillable buffer size.
-    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
+    // Sort uses a single buffer size - either the default spillable buffer size or the
+    // smallest buffer size required to fit the maximum row size.
+    long bufferSize = computeMaxSpillableBufferSize(
+        queryOptions.getDefault_spillable_buffer_size(), queryOptions.getMax_row_size());
 
     // The external sorter writes fixed-len and var-len data in separate sequences of
     // pages on disk and reads from both sequences when merging. This effectively
@@ -296,8 +298,10 @@ public class SortNode extends PlanNode {
           bufferSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
       perInstanceMinReservation = 3 * bufferSize * pageMultiplier;
     }
-    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
-        perInstanceMemEstimate, perInstanceMinReservation, bufferSize);
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(perInstanceMemEstimate)
+        .setMinReservationBytes(perInstanceMinReservation)
+        .setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
   }
 
   private static String getDisplayName(TSortType type) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b0f1e2e..3bb8083 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -429,6 +429,17 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testMaxRowSize() {
+    // Tests that an increased value of 'max_row_size' is correctly factored into the
+    // resource calculations by the planner.
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+    options.setMax_row_size(8L * 1024L * 1024L);
+    runPlannerTestFile("max-row-size", options, false);
+  }
+
+  @Test
   public void testSortExprMaterialization() {
     addTestFunction("TestFn", Lists.newArrayList(Type.DOUBLE), false);
     TQueryOptions options = defaultQueryOptions();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 888c7c6..1e156ba 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -102,7 +102,7 @@ having 1024 * 1024 * count(*) % 2 = 0
   and (sm between 5 and 10)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.94MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -110,7 +110,7 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=17B cardinality=0
 |
 00:SCAN HDFS [functional.alltypes]
@@ -129,7 +129,7 @@ left outer join functional.alltypes b
 where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=257.06MB mem-reservation=1.06MB
+|  Per-Host Resources: mem-estimate=257.94MB mem-reservation=1.94MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -138,7 +138,7 @@ PLAN-ROOT SINK
 |  fk/pk conjuncts: assumed fk/pk
 |  other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col
 |  other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -203,7 +203,7 @@ group by timestamp_col = cast('2015-11-15' as timestamp) + interval 1 year
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=2.12MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=3.88MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -211,13 +211,13 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count:merge(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: 1048576 * count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=2 row-size=17B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
-|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=17B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -234,7 +234,7 @@ from functional.alltypes
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.94MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -247,7 +247,7 @@ PLAN-ROOT SINK
 01:AGGREGATE
 |  output: count(*)
 |  group by: 2 + id
-|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=16B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index 04241c7..97d662b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -78,8 +78,8 @@ select count(*)
 from functional.alltypes t1
 join functional.alltypestiny t2 on t1.id = t2.id
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=1.06MB
-Per-Host Resource Estimates: Memory=181.06MB
+Max Per-Host Resource Reservation: Memory=1.94MB
+Per-Host Resource Estimates: Memory=181.94MB
 Codegen disabled by planner
 
 PLAN-ROOT SINK

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
index 632881a..4af1bef 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
@@ -117,7 +117,7 @@ on ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number
 where sr_return_quantity < 10
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=212.25MB mem-reservation=4.25MB
+|  Per-Host Resources: mem-estimate=212.75MB mem-reservation=4.75MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -125,7 +125,7 @@ PLAN-ROOT SINK
 |  hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
 |  fk/pk conjuncts: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
 |  runtime filters: RF000 <- sr_item_sk, RF001 <- sr_ticket_number
-|  mem-estimate=4.25MB mem-reservation=4.25MB spill-buffer=256.00KB
+|  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
 |  tuple-ids=0,1 row-size=188B cardinality=211838
 |
 |--01:SCAN HDFS [tpcds.store_returns]
@@ -229,7 +229,7 @@ where ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number
   and d1.d_fy_week_seq = 1000
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=355.19MB mem-reservation=4.25MB
+|  Per-Host Resources: mem-estimate=357.81MB mem-reservation=7.75MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -237,7 +237,7 @@ PLAN-ROOT SINK
 |  hash predicates: ss_addr_sk = c_current_addr_sk
 |  fk/pk conjuncts: none
 |  runtime filters: RF000 <- c_current_addr_sk
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3,4,2 row-size=60B cardinality=19358
 |
 |--02:SCAN HDFS [tpcds.customer]
@@ -252,7 +252,7 @@ PLAN-ROOT SINK
 |  hash predicates: sr_returned_date_sk = d2.d_date_sk
 |  fk/pk conjuncts: sr_returned_date_sk = d2.d_date_sk
 |  runtime filters: RF001 <- d2.d_date_sk
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3,4 row-size=56B cardinality=8131
 |
 |--04:SCAN HDFS [tpcds.date_dim d2]
@@ -267,14 +267,14 @@ PLAN-ROOT SINK
 |  hash predicates: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
 |  fk/pk conjuncts: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
 |  runtime filters: RF002 <- ss_item_sk, RF003 <- ss_ticket_number
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3 row-size=52B cardinality=8131
 |
 |--05:HASH JOIN [INNER JOIN]
 |  |  hash predicates: ss_sold_date_sk = d1.d_date_sk
 |  |  fk/pk conjuncts: ss_sold_date_sk = d1.d_date_sk
 |  |  runtime filters: RF004 <- d1.d_date_sk
-|  |  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  |  tuple-ids=0,3 row-size=32B cardinality=11055
 |  |
 |  |--03:SCAN HDFS [tpcds.date_dim d1]
@@ -380,7 +380,7 @@ tpcds_seq_snap.store_sales inner join tpcds.customer
 on ss_customer_sk = c_customer_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=177.06MB mem-reservation=1.06MB
+|  Per-Host Resources: mem-estimate=177.94MB mem-reservation=1.94MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -388,7 +388,7 @@ PLAN-ROOT SINK
 |  hash predicates: ss_customer_sk = c_customer_sk
 |  fk/pk conjuncts: assumed fk/pk
 |  runtime filters: RF000 <- c_customer_sk
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=0,1 row-size=8B cardinality=unavailable
 |
 |--01:SCAN HDFS [tpcds.customer]
@@ -416,7 +416,7 @@ tpcds.store_sales inner join
 on ss_sold_time_sk = ws_sold_time_sk
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=298.00MB mem-reservation=2.12MB
+|  Per-Host Resources: mem-estimate=298.00MB mem-reservation=3.88MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -424,12 +424,12 @@ PLAN-ROOT SINK
 |  hash predicates: ss_sold_time_sk = ws_sold_time_sk
 |  fk/pk conjuncts: none
 |  runtime filters: RF000 <- ws_sold_time_sk
-|  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=0,2 row-size=104B cardinality=2440073
 |
 |--02:AGGREGATE [FINALIZE]
 |  |  group by: ws_sold_time_sk
-|  |  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  |  tuple-ids=2 row-size=4B cardinality=39771
 |  |
 |  01:SCAN HDFS [tpcds.web_sales]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
new file mode 100644
index 0000000..0732771
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -0,0 +1,355 @@
+# Join with tiny build side.
+# Uses smallest possible buffers but two max-size buffers are required.
+select straight_join *
+from tpch_parquet.customer
+    inner join tpch_parquet.nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=16.94MB
+Per-Host Resource Estimates: Memory=56.94MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=40.94MB mem-reservation=16.94MB
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  fk/pk conjuncts: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  mem-estimate=16.94MB mem-reservation=16.94MB spill-buffer=64.00KB
+|  tuple-ids=0,1 row-size=355B cardinality=150000
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=117B cardinality=25
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.00MB mem-reservation=0B
+|  01:SCAN HDFS [tpch_parquet.nation, RANDOM]
+|     partitions=1/1 files=1 size=2.94KB
+|     stats-rows=25 extrapolated-rows=disabled
+|     table stats: rows=25 size=2.94KB
+|     column stats: all
+|     mem-estimate=16.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=117B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer, RANDOM]
+   partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=12.34MB
+   column stats: all
+   mem-estimate=24.00MB mem-reservation=0B
+   tuple-ids=0 row-size=238B cardinality=150000
+====
+# Join with large build side.
+# Uses default buffer size. Two max-size buffers are required.
+select straight_join *
+from tpch_parquet.lineitem
+    left join tpch_parquet.orders on l_orderkey = o_orderkey
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=46.00MB
+Per-Host Resource Estimates: Memory=420.41MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=380.41MB mem-reservation=46.00MB
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  mem-estimate=300.41MB mem-reservation=46.00MB spill-buffer=2.00MB
+|  tuple-ids=0,1N row-size=454B cardinality=6001215
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=191B cardinality=1500000
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+====
+# Null-aware anti-join with medium build side.
+# Uses intermediate buffer size plus two max-size buffers.
+select * from tpch_parquet.lineitem
+where l_orderkey not in (select o_orderkey from tpch_parquet.orders)
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=34.00MB
+Per-Host Resource Estimates: Memory=154.00MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=263B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=114.00MB mem-reservation=34.00MB
+02:HASH JOIN [NULL AWARE LEFT ANTI JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=1.00MB
+|  tuple-ids=0 row-size=263B cardinality=6001215
+|
+|--03:EXCHANGE [BROADCAST]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=8B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=8B cardinality=1500000
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+====
+# Mid NDV aggregation.
+# Uses smaller-than-default buffer size. Two max-size buffers are required.
+select straight_join l_orderkey, o_orderstatus, count(*)
+from tpch_parquet.lineitem
+    join tpch_parquet.orders on o_orderkey = l_orderkey
+group by 1, 2
+having count(*) = 1
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=77.00MB
+Per-Host Resource Estimates: Memory=251.12MB
+
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+08:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l_orderkey, o_orderstatus
+|  having: count(*) = 1
+|  mem-estimate=46.00MB mem-reservation=46.00MB spill-buffer=2.00MB
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+06:EXCHANGE [HASH(l_orderkey,o_orderstatus)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=85.12MB mem-reservation=31.00MB
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l_orderkey, o_orderstatus
+|  mem-estimate=54.12MB mem-reservation=0B spill-buffer=2.00MB
+|  tuple-ids=2 row-size=33B cardinality=4690314
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF000 <- o_orderkey
+|  mem-estimate=31.00MB mem-reservation=31.00MB spill-buffer=1.00MB
+|  tuple-ids=0,1 row-size=33B cardinality=5757710
+|
+|--05:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=1 row-size=25B cardinality=1500000
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.00MB mem-reservation=0B
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     partitions=1/1 files=2 size=54.20MB
+|     stats-rows=1500000 extrapolated-rows=disabled
+|     table stats: rows=1500000 size=54.20MB
+|     column stats: all
+|     mem-estimate=40.00MB mem-reservation=0B
+|     tuple-ids=1 row-size=25B cardinality=1500000
+|
+04:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=8B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.00MB mem-reservation=0B
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   runtime filters: RF000 -> l_orderkey
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=8B cardinality=6001215
+====
+# High NDV aggregation.
+# Uses default buffer size. Two max-size buffers are required.
+select distinct *
+from tpch_parquet.lineitem
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=46.00MB
+Per-Host Resource Estimates: Memory=3.31GB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB
+03:AGGREGATE [FINALIZE]
+|  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=46.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=1.69GB mem-reservation=0B
+01:AGGREGATE [STREAMING]
+|  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
+|  mem-estimate=1.62GB mem-reservation=0B spill-buffer=2.00MB
+|  tuple-ids=1 row-size=263B cardinality=6001215
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=263B cardinality=6001215
+====
+# High NDV aggregation with string aggregation function.
+# Uses default buffer size. Two max-size buffers are required.
+select l_orderkey, l_partkey, group_concat(l_linestatus, ",")
+from tpch_parquet.lineitem
+group by 1, 2
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=48.00MB
+Per-Host Resource Estimates: Memory=482.91MB
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=32B cardinality=6001215
+|
+F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB
+03:AGGREGATE [FINALIZE]
+|  output: group_concat:merge(l_linestatus, ',')
+|  group by: l_orderkey, l_partkey
+|  mem-estimate=201.46MB mem-reservation=48.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=32B cardinality=6001215
+|
+02:EXCHANGE [HASH(l_orderkey,l_partkey)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1 row-size=32B cardinality=6001215
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=281.46MB mem-reservation=0B
+01:AGGREGATE [STREAMING]
+|  output: group_concat(l_linestatus, ',')
+|  group by: l_orderkey, l_partkey
+|  mem-estimate=201.46MB mem-reservation=0B spill-buffer=2.00MB
+|  tuple-ids=1 row-size=32B cardinality=6001215
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   partitions=1/1 files=3 size=193.92MB
+   stats-rows=6001215 extrapolated-rows=disabled
+   table stats: rows=6001215 size=193.92MB
+   column stats: all
+   mem-estimate=80.00MB mem-reservation=0B
+   tuple-ids=0 row-size=33B cardinality=6001215
+====
+# Sort + Analytic.
+# The size of buffers for these plan nodes is increased to fit max_row_size.
+select max(tinyint_col) over(partition by int_col)
+from functional.alltypes
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=40.00MB
+Per-Host Resource Estimates: Memory=56.00MB
+Codegen disabled by planner
+
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=3,2 row-size=6B cardinality=7300
+|
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB
+02:ANALYTIC
+|  functions: max(tinyint_col)
+|  partition by: int_col
+|  mem-estimate=16.00MB mem-reservation=16.00MB spill-buffer=8.00MB
+|  tuple-ids=3,2 row-size=6B cardinality=7300
+|
+01:SORT
+|  order by: int_col ASC NULLS FIRST
+|  mem-estimate=24.00MB mem-reservation=24.00MB spill-buffer=8.00MB
+|  tuple-ids=3 row-size=5B cardinality=7300
+|
+03:EXCHANGE [HASH(int_col)]
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=0 row-size=5B cardinality=7300
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=0B
+00:SCAN HDFS [functional.alltypes, RANDOM]
+   partitions=24/24 files=24 size=478.45KB
+   stats-rows=7300 extrapolated-rows=disabled
+   table stats: rows=7300 size=478.45KB
+   column stats: all
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=5B cardinality=7300
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ed87c406/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 131c038..98239a7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -313,7 +313,7 @@ from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2
 where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=89.06MB mem-reservation=1.06MB
+|  Per-Host Resources: mem-estimate=89.94MB mem-reservation=1.94MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -324,7 +324,7 @@ PLAN-ROOT SINK
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
 |  |  fk/pk conjuncts: assumed fk/pk
-|  |  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  |  tuple-ids=1,0,2 row-size=286B cardinality=10
 |  |
 |  |--04:UNNEST [c.c_orders o2]
@@ -366,7 +366,7 @@ PLAN-ROOT SINK
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
-Per-Host Resources: mem-estimate=267.19MB mem-reservation=3.19MB
+Per-Host Resources: mem-estimate=269.81MB mem-reservation=5.81MB
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
@@ -374,7 +374,7 @@ Per-Host Resources: mem-estimate=267.19MB mem-reservation=3.19MB
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
 |  |  fk/pk conjuncts: assumed fk/pk
-|  |  mem-estimate=1.06MB mem-reservation=1.06MB spill-buffer=64.00KB
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  |  tuple-ids=1,0,2 row-size=286B cardinality=10
 |  |
 |  |--04:UNNEST [c.c_orders o2]


Mime
View raw message