impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [5/9] incubator-impala git commit: IMPALA-5681: release reservation from blocking operators
Date Fri, 18 Aug 2017 15:12:51 GMT
IMPALA-5681: release reservation from blocking operators

When an in-memory blocking aggregation or join is in the GetNext()
phase where it is outputting accumulated rows then we expect
memory consumption to monotonically decrease because no more
rows will be accumulated in memory.

This change adds support to release unused reservation and makes
use of it for in-memory aggregations and sorts.

We don't release memory for operators with spilled data, since they
may need the reservation to bring it back into memory. We also
don't release memory in subplans, since it will probably be used
in a later iteration of the subplan.

Testing:
Updated spilling test that now requires less memory.

Ran stress test binary search on tpch_parquet. No changes, except
Q18 now requires 325MB instead of 450MB to execute without spilling.

Ran query with two sorts in the same pipeline and watched /memz to
confirm that the first node in the pipeline was incrementally releasing
memory. Added a regression test based on this experiment.

Added a backend test to directly test reservation decreasing.

Change-Id: I6f4d0ad127d5fcd14b9821a7c127eec11d98692f
Reviewed-on: http://gerrit.cloudera.org:8080/7619
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8609b09a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8609b09a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8609b09a

Branch: refs/heads/master
Commit: 8609b09a95f2b20e42e4ba4aa8b5d605fb545a4e
Parents: 5fcc9cb
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Mon Aug 7 09:15:29 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Aug 17 20:17:48 2017 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        |  4 ++
 be/src/exec/exec-node.h                         | 11 +++-
 be/src/exec/partitioned-aggregation-node.cc     | 10 ++++
 be/src/exec/sort-node.cc                        | 14 +++++
 be/src/exec/sort-node.h                         |  4 ++
 .../runtime/bufferpool/buffer-pool-internal.h   | 14 ++++-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 62 ++++++++++++++++++--
 be/src/runtime/bufferpool/buffer-pool.cc        | 21 +++++++
 be/src/runtime/bufferpool/buffer-pool.h         | 10 ++++
 be/src/runtime/sorter.cc                        |  7 +++
 be/src/runtime/sorter.h                         |  5 +-
 .../queries/QueryTest/spilling-aggs.test        |  2 +-
 .../tpch/queries/sort-reservation-usage.test    | 30 ++++++++++
 tests/query_test/test_sort.py                   |  4 ++
 14 files changed, 187 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c3a5f80..afd7262 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -247,6 +247,10 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
   return Status::OK();
 }
 
+Status ExecNode::ReleaseUnusedReservation() {
+  return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation);
+}
+
 Status ExecNode::CreateTree(
     RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode**
root) {
   if (plan.nodes.size() == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 55c51ab..04470f2 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -233,7 +233,16 @@ class ExecNode {
   /// ExecNode. Only needs to be called by ExecNodes that will use the client.
   /// The client is automatically cleaned up in Close(). Should not be called if
   /// the client is already open.
-  Status ClaimBufferReservation(RuntimeState* state);
+  ///
+  /// The ExecNode must return the initial reservation to
+  /// QueryState::initial_reservations(), which is done automatically in Close() as long
+  /// as the initial reservation is not released before Close().
+  Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Release any unused reservation in excess of the node's initial reservation. Returns
+  /// an error if releasing the reservation requires flushing pages to disk, and that
+  /// fails.
+  Status ReleaseUnusedReservation() WARN_UNUSED_RESULT;
 
   /// Extends blocking queue for row batches. Row batches have a property that
   /// they must be processed in the order they were produced, even in cancellation

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 8432bcc..3949041 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1198,6 +1198,16 @@ int64_t PartitionedAggregationNode::LargestSpilledPartition() const
{
 Status PartitionedAggregationNode::NextPartition() {
   DCHECK(output_partition_ == nullptr);
 
+  if (!IsInSubplan() && spilled_partitions_.empty()) {
+    // All partitions are in memory. Release reservation that was used for previous
+    // partitions that is no longer needed. If we have spilled partitions, we want to
+    // hold onto all reservation in case it is needed to process the spilled partitions.
+    DCHECK(!buffer_pool_client_.has_unpinned_pages());
+    Status status = ReleaseUnusedReservation();
+    DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there
are "
+                        << "no unpinned pages. " << status.GetDetail();
+  }
+
   // Keep looping until we get to a partition that fits in memory.
   Partition* partition = nullptr;
   while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 440f809..80df214 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -103,6 +103,19 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
eos) {
     *eos = false;
   }
 
+  if (returned_buffer_) {
+    // If the Sorter returned a buffer on the last call to GetNext(), we might have an
+    // opportunity to release memory. Release reservation, unless it might be needed
+    // for the next subplan iteration or merging spilled runs.
+    returned_buffer_ = false;
+    if (!IsInSubplan() && !sorter_->HasSpilledRuns()) {
+      DCHECK(!buffer_pool_client_.has_unpinned_pages());
+      Status status = ReleaseUnusedReservation();
+      DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are
"
+                          << "unpinned. " << status.GetDetail();
+    }
+  }
+
   DCHECK_EQ(row_batch->num_rows(), 0);
   RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
   while ((num_rows_skipped_ < offset_)) {
@@ -119,6 +132,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
eos) {
     RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
   }
 
+  returned_buffer_ = row_batch->num_buffers() > 0;
   num_rows_returned_ += row_batch->num_rows();
   if (ReachedLimit()) {
     row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index a11d424..d6eef25 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -69,6 +69,10 @@ class SortNode : public ExecNode {
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 
+  /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch.
+  /// Used to avoid unnecessary calls to ReleaseUnusedReservation().
+  bool returned_buffer_ = false;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 0c0408b..7094942 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -227,7 +227,7 @@ class BufferPool::Client {
   /// already in memory, ensures the data is in the page's buffer. If the data is on
   /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to
   /// true. Neither the client's lock nor page->buffer_lock should be held by the caller.
-  Status StartMoveToPinned(ClientHandle* client, Page* page);
+  Status StartMoveToPinned(ClientHandle* client, Page* page) WARN_UNUSED_RESULT;
 
   /// Moves a page that has a pin in flight back to the evicted state, undoing
   /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held
@@ -236,13 +236,16 @@ class BufferPool::Client {
 
   /// Finish the work of bring the data of an evicted page to memory if
   /// page->pin_in_flight was set to true by StartMoveToPinned().
-  Status FinishMoveEvictedToPinned(Page* page);
+  Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT;
 
   /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
   /// API to deduct from the client's reservation and update internal accounting. Cleans
   /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or
   /// client locks should be held by the caller.
-  Status PrepareToAllocateBuffer(int64_t len);
+  Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
+
+  /// Implementation of ClientHandle::DecreaseReservationTo().
+  Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
 
   /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update
   /// internal accounting and release the buffer to the client's reservation. No page or
@@ -272,6 +275,11 @@ class BufferPool::Client {
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
   void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
+  bool has_unpinned_pages() const {
+    // Safe to read without lock since other threads should not be calling BufferPool
+    // functions that create, destroy or unpin pages.
+    return pinned_pages_.size() < num_pages_;
+  }
 
   std::string DebugString();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 2eff955..b2f8695 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -157,6 +157,14 @@ class BufferPoolTest : public ::testing::Test {
     return !page->page_->buffer.is_open();
   }
 
+  int NumEvicted(vector<BufferPool::PageHandle>& pages) {
+    int num_evicted = 0;
+    for (PageHandle& page : pages) {
+      if (IsEvicted(&page)) ++num_evicted;
+    }
+    return num_evicted;
+  }
+
   /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to
   /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
   /// If 'randomize_core' is true, will switch thread between cores randomly before
@@ -606,6 +614,7 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   vector<PageHandle> pages;
   CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
   WriteData(pages, 0);
+  EXPECT_FALSE(client.has_unpinned_pages());
 
   // Pages don't start off clean.
   EXPECT_EQ(0, pool.GetNumCleanPages());
@@ -613,22 +622,27 @@ TEST_F(BufferPoolTest, CleanPageStats) {
 
   // Unpin pages and wait until they're written out and therefore clean.
   UnpinAll(&pool, &client, &pages);
+  EXPECT_TRUE(client.has_unpinned_pages());
   WaitForAllWrites(&client);
   EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages());
   EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes());
+  EXPECT_TRUE(client.has_unpinned_pages());
 
   // Do an allocation to force eviction of one page.
   ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
   EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages());
   EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes());
+  EXPECT_TRUE(client.has_unpinned_pages());
 
   // Re-pin all the pages - none will be clean afterwards.
   ASSERT_OK(PinAll(&pool, &client, &pages));
   VerifyData(pages, 0);
   EXPECT_EQ(0, pool.GetNumCleanPages());
   EXPECT_EQ(0, pool.GetCleanPageBytes());
+  EXPECT_FALSE(client.has_unpinned_pages());
 
   DestroyAll(&pool, &client, &pages);
+  EXPECT_FALSE(client.has_unpinned_pages());
   pool.DeregisterClient(&client);
   global_reservations_.Close();
 }
@@ -1242,11 +1256,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
   // No additional memory should have been allocated - it should have been recycled.
   EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
   // Check that two pages were evicted.
-  int num_evicted = 0;
-  for (PageHandle& page : pages) {
-    if (IsEvicted(&page)) ++num_evicted;
-  }
-  EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted);
+  EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages));
 
   // Free up memory required to pin the original pages again.
   FreeBuffers(&pool, &client, &extra_buffers);
@@ -1928,6 +1938,48 @@ TEST_F(BufferPoolTest, SubReservation) {
   subreservation.Close();
   pool.DeregisterClient(&client);
 }
+
+// Check that we can decrease reservation without violating any buffer pool invariants.
+TEST_F(BufferPoolTest, DecreaseReservation) {
+  const int MAX_NUM_BUFFERS = 4;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  WriteData(pages, 0);
+
+  // Unpin pages and decrease reservation while the writes are in flight.
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_OK(client.DecreaseReservationTo(2 * TEST_BUFFER_LEN));
+  // Two pages must be clean to stay within reservation
+  EXPECT_GE(pool.GetNumCleanPages(), 2);
+  EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation());
+
+  // Decrease it further after the pages are evicted.
+  WaitForAllWrites(&client);
+  ASSERT_OK(client.DecreaseReservationTo(TEST_BUFFER_LEN));
+  EXPECT_GE(pool.GetNumCleanPages(), 3);
+  EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation());
+
+  // Check that we can still use the reservation.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  EXPECT_EQ(1, NumEvicted(pages));
+
+  // Check that we can decrease it to zero.
+  ASSERT_OK(client.DecreaseReservationTo(0));
+  EXPECT_EQ(0, client.GetReservation());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index df92928..2e9ba3d 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -296,6 +296,10 @@ bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes)
{
   return impl_->reservation()->IncreaseReservationToFit(bytes);
 }
 
+Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) {
+  return impl_->DecreaseReservationTo(target_bytes);
+}
+
 int64_t BufferPool::ClientHandle::GetReservation() const {
   return impl_->reservation()->GetReservation();
 }
@@ -334,6 +338,10 @@ void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double
probabilit
   impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
 }
 
+bool BufferPool::ClientHandle::has_unpinned_pages() const {
+  return impl_->has_unpinned_pages();
+}
+
 BufferPool::SubReservation::SubReservation(ClientHandle* client) {
   tracker_.reset(new ReservationTracker);
   tracker_->InitChildTracker(
@@ -543,6 +551,19 @@ Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
   return Status::OK();
 }
 
+Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) {
+  unique_lock<mutex> lock(lock_);
+  int64_t current_reservation = reservation_.GetReservation();
+  DCHECK_GE(current_reservation, target_bytes);
+  int64_t amount_to_free =
+      min(reservation_.GetUnusedReservation(), current_reservation - target_bytes);
+  if (amount_to_free == 0) return Status::OK();
+  // Clean enough pages to allow us to safely release reservation.
+  RETURN_IF_ERROR(CleanPages(&lock, amount_to_free));
+  reservation_.DecreaseReservation(amount_to_free);
+  return Status::OK();
+}
+
 Status BufferPool::Client::CleanPages(unique_lock<mutex>* client_lock, int64_t len)
{
   DCheckHoldsLock(*client_lock);
   DCHECK_CONSISTENCY();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 4798d6c..7b11551 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -324,6 +324,13 @@ class BufferPool::ClientHandle {
   /// if successful, after which 'bytes' can be used.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
+  /// Try to decrease this client's reservation down to a minimum of 'target_bytes' by
+  /// releasing unused reservation to ancestor ReservationTrackers, all the way up to
+  /// the root of the ReservationTracker tree. This client's reservation must be at least
+  /// 'target_bytes' before calling this method. May fail if decreasing the reservation
+  /// requires flushing unpinned pages to disk and a write to disk fails.
+  Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
+
   /// Move some of this client's reservation to the SubReservation. 'bytes' of unused
   /// reservation must be available in this tracker.
   void SaveReservation(SubReservation* dst, int64_t bytes);
@@ -351,6 +358,9 @@ class BufferPool::ClientHandle {
 
   bool is_registered() const { return impl_ != NULL; }
 
+  /// Return true if there are any unpinned pages for this client.
+  bool has_unpinned_pages() const;
+
   std::string DebugString() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ee0e4be..de05945 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1760,4 +1760,11 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
   RETURN_IF_ERROR(merged_run->FinalizeInput());
   return Status::OK();
 }
+
+bool Sorter::HasSpilledRuns() const {
+  // All runs in 'merging_runs_' are spilled. 'sorted_runs_' can contain at most one
+  // non-spilled run.
+  return !merging_runs_.empty() || sorted_runs_.size() > 1 ||
+      (sorted_runs_.size() == 1 && !sorted_runs_.back()->is_pinned());
+}
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index cafab72..5e7240b 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -152,6 +152,9 @@ class Sorter {
   /// sort with the current sorter.
   int64_t ComputeMinReservation();
 
+  /// Return true if the sorter has any spilled runs.
+  bool HasSpilledRuns() const;
+
  private:
   class Page;
   class Run;
@@ -239,7 +242,7 @@ class Sorter {
   /// memory.
   boost::scoped_ptr<SortedRunMerger> merger_;
 
-  /// Runs that are currently processed by the merge_.
+  /// Spilled runs that are currently processed by the merge_.
   /// These runs can be deleted when we are done with the current merge.
   std::deque<Run*> merging_runs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
index 6fe86c3..d9f60cc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
@@ -125,7 +125,7 @@ row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
 # Test aggregation spill with group_concat distinct
-set buffer_pool_limit=50m;
+set buffer_pool_limit=30m;
 select l_orderkey, count(*), group_concat(distinct l_linestatus, '|')
 from lineitem
 group by 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/testdata/workloads/tpch/queries/sort-reservation-usage.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/sort-reservation-usage.test b/testdata/workloads/tpch/queries/sort-reservation-usage.test
new file mode 100644
index 0000000..92f180d
--- /dev/null
+++ b/testdata/workloads/tpch/queries/sort-reservation-usage.test
@@ -0,0 +1,30 @@
+====
+---- QUERY
+# Test that in-mem sorts incrementally give up memory when emitting output.
+# This query and the limit is calibrated to fail if the first sort does not
+# give up memory to the second sort.
+set num_nodes=1;
+set scratch_limit=0;
+set buffer_pool_limit=15m;
+set default_spillable_buffer_size=64kb;
+SELECT *
+FROM   (SELECT
+        Rank() OVER(ORDER BY  l_orderkey) AS rank,
+        Rank() OVER(ORDER BY  l_partkey) AS rank2
+        FROM lineitem
+        WHERE l_shipdate < '1992-05-09') a
+WHERE rank < 10
+ORDER BY rank;
+---- RESULTS
+1,118035
+2,55836
+2,141809
+2,155407
+5,84064
+5,129763
+7,10725
+7,31340
+7,155173
+---- TYPES
+BIGINT,BIGINT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index df95ddd..0eae035 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -159,6 +159,10 @@ class TestQueryFullSort(ImpalaTestSuite):
       query, exec_option, table_format=table_format).data)
     assert(result[0] == sorted(result[0]))
 
+  def test_sort_reservation_usage(self, vector):
+    """Tests for sorter reservation usage."""
+    self.run_test_case('sort-reservation-usage', vector)
+
 class TestRandomSort(ImpalaTestSuite):
   @classmethod
   def get_workload(self):


Mime
View raw message