impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/3] incubator-impala git commit: IMPALA-4678: move query MemTracker into QueryState
Date Thu, 19 Jan 2017 07:24:16 GMT
IMPALA-4678: move query MemTracker into QueryState

The query MemTracker for query execution is now owned directly by
QueryState, which greatly simplifies the lifecycle of the MemTracker.
This required various other changes and enabled some simplifications.

* The coordinator's QueryState is constructed earlier before fragments
  are sent out, since we need a MemTracker at that point.
* The global query MemTracker map can be removed.
* The static request pool mem tracker is moved into into ExecEnv.
* Temporary query MemTrackers used to evaluate expressions during
  planning do not need to be registered globally and are owned
  directly by the RuntimeState.
* Various cleanup logic is moved around to reflect the other changes.

Change-Id: Id6b46652932b5638993623e98d1f0d60d8380ba0
Reviewed-on: http://gerrit.cloudera.org:8080/5630
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/85edc15f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/85edc15f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/85edc15f

Branch: refs/heads/master
Commit: 85edc15fefe9ab9b2883b9c9f7451efc7e26ff94
Parents: 69859bd
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Tue Dec 20 09:02:31 2016 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Jan 19 02:34:39 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc               | 15 ++--
 be/src/exprs/expr-test.cc                    | 13 ++--
 be/src/runtime/buffered-block-mgr-test.cc    | 19 ++---
 be/src/runtime/buffered-tuple-stream-test.cc |  8 +-
 be/src/runtime/coordinator.cc                | 59 +++++---------
 be/src/runtime/coordinator.h                 | 32 ++++----
 be/src/runtime/data-stream-test.cc           | 37 ++++-----
 be/src/runtime/exec-env.cc                   |  2 +
 be/src/runtime/exec-env.h                    |  5 +-
 be/src/runtime/mem-tracker.cc                | 82 ++++++++------------
 be/src/runtime/mem-tracker.h                 | 94 +++++++++++------------
 be/src/runtime/plan-fragment-executor.cc     |  1 -
 be/src/runtime/query-exec-mgr.cc             | 64 +++++++++------
 be/src/runtime/query-exec-mgr.h              | 13 +++-
 be/src/runtime/query-state.cc                | 28 ++++++-
 be/src/runtime/query-state.h                 | 38 +++++++--
 be/src/runtime/runtime-state.cc              | 65 ++++++++--------
 be/src/runtime/runtime-state.h               | 26 +++----
 be/src/runtime/test-env.cc                   | 43 ++++++-----
 be/src/runtime/test-env.h                    | 29 ++++---
 be/src/runtime/tmp-file-mgr-test.cc          |  2 +-
 be/src/scheduling/admission-controller.cc    |  3 +-
 be/src/service/fe-support.cc                 | 17 ++--
 be/src/service/impala-http-handler.cc        | 20 ++---
 be/src/service/query-exec-state.cc           |  8 +-
 25 files changed, 371 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 6f0978e..230ebb5 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -187,8 +187,8 @@ class HashTableTest : public testing::Test {
       int max_num_blocks = 100, int reserved_blocks = 10) {
     EXPECT_OK(test_env_->CreateQueryState(
         next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
-    MemTracker* client_tracker = pool_.Add(
-        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    MemTracker* client_tracker =
+        pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     BufferedBlockMgr::Client* client;
     EXPECT_OK(runtime_state_->block_mgr()->RegisterClient(
         "", reserved_blocks, false, client_tracker, runtime_state_, &client));
@@ -603,13 +603,12 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  EXPECT_TRUE(
-      test_env_->CreateQueryState(
-        0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_).ok());
+  EXPECT_OK(test_env_->CreateQueryState(
+      0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_));
   scoped_ptr<HashTableCtx> ht_ctx;
-  Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
-      probe_expr_ctxs_, false /* !stores_nulls_ */,
-      vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1, &tracker_, &ht_ctx);
+  Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, probe_expr_ctxs_,
+      false /* !stores_nulls_ */, vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1,
+      &tracker_, &ht_ctx);
   EXPECT_OK(status);
 
   uint32_t seed = 9999;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 3672101..8210a4d 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -1019,15 +1019,16 @@ void ExprTest::TestTimestampValue(const string& expr, const TimestampValue& expe
 // Tests whether the returned TimestampValue is valid.
 // We use this function for tests where the expected value is unknown, e.g., now().
 void ExprTest::TestValidTimestampValue(const string& expr) {
-  EXPECT_TRUE(ConvertValue<TimestampValue>(GetValue(expr, TYPE_TIMESTAMP))
-      .HasDateOrTime());
+  EXPECT_TRUE(
+      ConvertValue<TimestampValue>(GetValue(expr, TYPE_TIMESTAMP)).HasDateOrTime());
 }
 
-template <typename T> void TestSingleLiteralConstruction(
+template <typename T>
+void TestSingleLiteralConstruction(
     const ColumnType& type, const T& value, const string& string_val) {
   ObjectPool pool;
   RowDescriptor desc;
-  RuntimeState state{TQueryCtx()};
+  RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"};
   MemTracker tracker;
 
   Expr* expr = pool.Add(new Literal(type, value));
@@ -1037,18 +1038,20 @@ template <typename T> void TestSingleLiteralConstruction(
   EXPECT_EQ(0, RawValue::Compare(ctx.GetValue(NULL), &value, type))
       << "type: " << type << ", value: " << value;
   ctx.Close(&state);
+  state.ReleaseResources();
 }
 
 TEST_F(ExprTest, NullLiteral) {
   for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
     NullLiteral expr(static_cast<PrimitiveType>(type));
     ExprContext ctx(&expr);
-    RuntimeState state{TQueryCtx()};
+    RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"};
     MemTracker tracker;
     EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
     EXPECT_OK(ctx.Open(&state));
     EXPECT_TRUE(ctx.GetValue(NULL) == NULL);
     ctx.Close(&state);
+    state.ReleaseResources();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 9b616f5..188d9e9 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -31,6 +31,7 @@
 #include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
 #include "runtime/tmp-file-mgr.h"
@@ -188,7 +189,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   void TearDownMgrs() {
     // Tear down the query states, which DCHECKs that the memory consumption of
     // the query's trackers is zero.
-    test_env_->TearDownRuntimeStates();
+    test_env_->TearDownQueries();
   }
 
   void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
@@ -557,16 +558,15 @@ class BufferedBlockMgrTest : public ::testing::Test {
     const int num_threads = 8;
     thread_group workers;
     // Create a shared RuntimeState with no BufferedBlockMgr.
-    RuntimeState* shared_state =
-        new RuntimeState(TQueryCtx(), test_env_->exec_env());
-    shared_state->InitMemTrackers(NULL, -1);
+    RuntimeState shared_state(TQueryCtx(), test_env_->exec_env(), "test-pool");
 
     for (int i = 0; i < num_threads; ++i) {
       thread* t = new thread(
-          bind(&BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));
+          bind(&BufferedBlockMgrTest::CreateDestroyThread, this, &shared_state));
       workers.add_thread(t);
     }
     workers.join_all();
+    shared_state.ReleaseResources();
   }
 
   // Test that in-flight IO operations are correctly handled on tear down.
@@ -891,16 +891,13 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
   BufferedBlockMgr::Client* client;
   CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, &state);
 
-  // Hold extra references to block mgr and query mem tracker so they can outlive runtime
-  // state.
+  // Hold extra references to block mgr and query state so they outlive RuntimeState.
   shared_ptr<BufferedBlockMgr> block_mgr;
-  shared_ptr<MemTracker> query_mem_tracker;
+  QueryState::ScopedRef qs(state->query_id());
   Status status = BufferedBlockMgr::Create(state, state->query_mem_tracker(),
       state->runtime_profile(), test_env_->tmp_file_mgr(), 0, block_size_, &block_mgr);
   ASSERT_TRUE(status.ok());
   ASSERT_TRUE(block_mgr != NULL);
-  query_mem_tracker = MemTracker::GetQueryMemTracker(
-      state->query_id(), -1, test_env_->exec_env()->process_mem_tracker());
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
@@ -926,7 +923,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
   // scenario by holding onto a reference to the block mgr. This should be safe so
   // long as blocks are properly deleted before the runtime state is torn down.
   DeleteBlocks(blocks);
-  test_env_->TearDownRuntimeStates();
+  test_env_->TearDownQueries();
 
   // Optionally wait for writes to complete after cancellation.
   if (wait_for_writes) WaitForWrites(block_mgr.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 4552d15..862a9a2 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -101,10 +101,10 @@ class SimpleTupleStreamTest : public testing::Test {
   /// Setup a block manager with the provided settings and client with no reservation,
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
-    ASSERT_OK(
-        test_env_->CreateQueryState(0, limit, block_size, nullptr, &runtime_state_));
-    MemTracker* client_tracker = pool_.Add(
-        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    ASSERT_OK(test_env_->CreateQueryState(
+        0, limit, block_size, nullptr, &runtime_state_));
+    MemTracker* client_tracker =
+        pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
         "", 0, false, client_tracker, runtime_state_, &client_));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0cfb340..f8e3dfd 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -53,14 +53,13 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/parallel-executor.h"
 #include "runtime/plan-fragment-executor.h"
-#include "runtime/row-batch.h"
 #include "runtime/query-exec-mgr.h"
-#include "runtime/query-state.h"
-#include "runtime/fragment-instance-state.h"
+#include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 #include "scheduling/scheduler.h"
 #include "util/bloom-filter.h"
@@ -365,7 +364,7 @@ Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
     exec_env_(exec_env),
     has_called_wait_(false),
     returned_all_results_(false),
-    query_mem_tracker_(), // Set in Exec()
+    query_state_(nullptr),
     num_remaining_fragment_instances_(0),
     obj_pool_(new ObjectPool()),
     query_events_(events),
@@ -375,13 +374,6 @@ Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
 
 Coordinator::~Coordinator() {
   DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed";
-
-  // This may be NULL while executing UDFs.
-  if (filter_mem_tracker_.get() != nullptr) {
-    filter_mem_tracker_->UnregisterFromParent();
-  }
-  filter_mem_tracker_.reset();
-  query_mem_tracker_.reset();
 }
 
 PlanFragmentExecutor* Coordinator::executor() {
@@ -471,20 +463,10 @@ Status Coordinator::Exec() {
   // execution at Impala daemons where it hasn't even started
   lock_guard<mutex> l(lock_);
 
-  // The coordinator may require a query mem tracker for result-caching, which tracks
-  // memory via the query mem tracker.
-  int64_t query_limit = -1;
-  if (query_ctx_.client_request.query_options.__isset.mem_limit
-      && query_ctx_.client_request.query_options.mem_limit > 0) {
-    query_limit = query_ctx_.client_request.query_options.mem_limit;
-  }
-  MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
-      schedule_.request_pool(), exec_env_->process_mem_tracker());
-  query_mem_tracker_ =
-      MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
-  DCHECK(query_mem_tracker() != nullptr);
-  filter_mem_tracker_.reset(
-      new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false));
+  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
+      query_ctx_, schedule_.request_pool());
+  filter_mem_tracker_.reset(new MemTracker(
+      -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
   InitExecProfiles();
   InitExecSummary();
@@ -499,15 +481,10 @@ Status Coordinator::Exec() {
   // be set up. Must do this here in order to get a reference to coord_instance_
   // so that coord_sink_ remains valid throughout query lifetime.
   if (schedule_.GetCoordFragment() != nullptr) {
-    QueryState* qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id_);
-    if (qs != nullptr) coord_instance_ = qs->GetFInstanceState(query_id_);
+    coord_instance_ = query_state_->GetFInstanceState(query_id_);
     if (coord_instance_ == nullptr) {
       // Coordinator instance might have failed and unregistered itself even
       // though it was successfully started (e.g. Prepare() might have failed).
-      if (qs != nullptr) {
-        ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
-        qs = nullptr;
-      }
       InstanceState* coord_state = fragment_instance_states_[0];
       DCHECK(coord_state != nullptr);
       lock_guard<mutex> instance_state_lock(*coord_state->lock());
@@ -1580,10 +1557,6 @@ RuntimeState* Coordinator::runtime_state() {
   return executor() == NULL ? NULL : executor()->runtime_state();
 }
 
-MemTracker* Coordinator::query_mem_tracker() {
-  return query_mem_tracker_.get();
-}
-
 bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
   // Assume we are called only after all fragments have completed
   DCHECK(has_called_wait_);
@@ -1919,16 +1892,22 @@ void Coordinator::TearDown() {
       state->Disable(filter_mem_tracker_.get());
     }
   }
-
+  // This may be NULL while executing UDFs.
+  if (filter_mem_tracker_.get() != nullptr) {
+    filter_mem_tracker_->UnregisterFromParent();
+    filter_mem_tracker_.reset();
+  }
   // Need to protect against failed Prepare(), where root_sink() would not be set.
   if (coord_sink_ != nullptr) {
     coord_sink_->CloseConsumer();
     coord_sink_ = nullptr;
   }
-  if (coord_instance_ != nullptr) {
-    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(
-        coord_instance_->query_state());
-    coord_instance_ = nullptr;
+  coord_instance_ = nullptr;
+  if (query_state_ != nullptr) {
+    // Tear down the query state last - other members like 'filter_mem_tracker_'
+    // may reference objects with query lifetime, like the query MemTracker.
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+    query_state_ = nullptr;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index e1334db..20a2af5 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,11 +38,12 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
+#include "runtime/query-state.h"
+#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
+#include "scheduling/query-schedule.h"
 #include "util/histogram-metric.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile.h"
-#include "scheduling/query-schedule.h"
-#include "runtime/runtime-state.h"  // for PartitionStatusMap; TODO: disentangle
 
 namespace impala {
 
@@ -150,6 +151,14 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// to CancelInternal().
   Status UpdateFragmentExecStatus(const TReportExecStatusParams& params);
 
+  /// Returns the query state.
+  /// Only valid to call after Exec() and before TearDown(). The returned
+  /// reference only remains valid until TearDown() is called.
+  QueryState* query_state() const {
+    DCHECK(!torn_down_);
+    return query_state_;
+  }
+
   /// Only valid *after* calling Exec(). Return nullptr if the running query does not
   /// produce any rows.
   ///
@@ -158,12 +167,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// class.
   RuntimeState* runtime_state();
 
-  /// Only valid after Exec(). Returns runtime_state()->query_mem_tracker() if there
-  /// is a coordinator fragment, or query_mem_tracker_ (initialized in Exec()) otherwise.
-  ///
-  /// TODO: Remove, see runtime_state().
-  MemTracker* query_mem_tracker();
-
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
@@ -171,6 +174,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   const TUniqueId& query_id() const { return query_id_; }
 
+  MemTracker* query_mem_tracker() const { return query_state()->query_mem_tracker(); }
+
   /// This is safe to call only after Wait()
   const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
 
@@ -287,6 +292,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Once this is set to true, errors from remote fragments are ignored.
   bool returned_all_results_;
 
+  /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
+  QueryState* query_state_;
+
   /// Non-null if and only if the query produces results for the client; i.e. is of
   /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree and return
   /// them to the client in GetNext(), and also to access the fragment instance's runtime
@@ -303,12 +311,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// Query mem tracker for this coordinator initialized in Exec(). Only valid if there
-  /// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not NULL,
-  /// this->runtime_state()->query_mem_tracker() returns the query mem tracker.
-  /// (See this->query_mem_tracker())
-  std::shared_ptr<MemTracker> query_mem_tracker_;
-
   /// owned by plan root, which resides in runtime_state_'s pool
   const RowDescriptor* row_desc_;
 
@@ -433,7 +435,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   TRuntimeFilterMode::type filter_mode_;
 
   /// Tracks the memory consumed by runtime filters during aggregation. Child of
-  /// query_mem_tracker_.
+  /// the query mem tracker in 'query_state_'.
   std::unique_ptr<MemTracker> filter_mem_tracker_;
 
   /// True if and only if TearDown() has been called.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index c76afb5..f2785cd 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -113,16 +113,15 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
 
 class DataStreamTest : public testing::Test {
  protected:
-  DataStreamTest()
-    : runtime_state_(TQueryCtx(), &exec_env_),
-      next_val_(0) {
-    // Initialize Mem trackers for use by the data stream receiver.
+  DataStreamTest() : next_val_(0) {
+    // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
     exec_env_.InitForFeTests();
-    runtime_state_.InitMemTrackers(NULL, -1);
+    runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_, "test-pool"));
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
   }
+  ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
   virtual void SetUp() {
     CreateRowDesc();
@@ -191,7 +190,7 @@ class DataStreamTest : public testing::Test {
   const RowDescriptor* row_desc_;
   TupleRowComparator* less_than_;
   ExecEnv exec_env_;
-  RuntimeState runtime_state_;
+  scoped_ptr<RuntimeState> runtime_state_;
   TUniqueId next_instance_id_;
   string stmt_;
 
@@ -277,7 +276,7 @@ class DataStreamTest : public testing::Test {
     slot_desc.__set_slotIdx(0);
     thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
     EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_));
-    runtime_state_.set_desc_tbl(desc_tbl_);
+    runtime_state_->set_desc_tbl(desc_tbl_);
 
     vector<TTupleId> row_tids;
     row_tids.push_back(0);
@@ -337,10 +336,8 @@ class DataStreamTest : public testing::Test {
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr =
-        stream_mgr_->CreateRecvr(&runtime_state_,
-            *row_desc_, instance_id, DEST_NODE_ID, num_senders, buffer_size, profile,
-            is_merging);
+    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), *row_desc_,
+        instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, is_merging);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
     } else {
@@ -481,11 +478,10 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  void Sender(int sender_num, int channel_buffer_size,
-              TPartitionType::type partition_type) {
-    RuntimeState state(TQueryCtx(), &exec_env_);
+  void Sender(
+      int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
+    RuntimeState state(TQueryCtx(), &exec_env_, "test-pool");
     state.set_desc_tbl(desc_tbl_);
-    state.InitMemTrackers(NULL, -1);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataStreamSink& sink = GetSink(partition_type);
     DataStreamSender sender(
@@ -507,10 +503,11 @@ class DataStreamTest : public testing::Test {
     info.num_bytes_sent = sender.GetNumDataBytesSent();
 
     batch->Reset();
+    state.ReleaseResources();
   }
 
-  void TestStream(TPartitionType::type stream_type, int num_senders,
-                  int num_receivers, int buffer_size, bool is_merging) {
+  void TestStream(TPartitionType::type stream_type, int num_senders, int num_receivers,
+      int buffer_size, bool is_merging) {
     VLOG_QUERY << "Testing stream=" << stream_type << " #senders=" << num_senders
                << " #receivers=" << num_receivers << " buffer_size=" << buffer_size
                << " is_merging=" << is_merging;
@@ -596,9 +593,8 @@ TEST_F(DataStreamTest, BasicTest) {
 //
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
-  runtime_state->InitMemTrackers(NULL, -1);
-
+  scoped_ptr<RuntimeState> runtime_state(
+      new RuntimeState(TQueryCtx(), &exec_env_, "test-pool"));
   scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
 
   // Start just one receiver.
@@ -612,6 +608,7 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   stream_recvr->Close();
 
   // Force deletion of the parent memtracker by destroying it's owning runtime state.
+  runtime_state->ReleaseResources();
   runtime_state.reset();
 
   // Send an eos RPC to the receiver. Not required for tear-down, but confirms that the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index d93a459..18c1ef5 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -137,6 +137,7 @@ ExecEnv::ExecEnv()
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver()),
     mem_tracker_(NULL),
+    pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),
     hdfs_op_thread_pool_(
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
@@ -191,6 +192,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver(webserver_port)),
     mem_tracker_(NULL),
+    pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),
     hdfs_op_thread_pool_(
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index be90a5a..bdeb4a4 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -38,6 +38,7 @@ class HdfsFsCache;
 class ImpalaServer;
 class LibCache;
 class MemTracker;
+class PoolMemTrackerRegistry;
 class MetricGroup;
 class QueryResourceMgr;
 class RequestPoolService;
@@ -57,7 +58,7 @@ class ExecEnv {
   ExecEnv();
 
   ExecEnv(const std::string& hostname, int backend_port, int subscriber_port,
-          int webserver_port, const std::string& statestore_host, int statestore_port);
+      int webserver_port, const std::string& statestore_host, int statestore_port);
 
   /// Returns the first created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -97,6 +98,7 @@ class ExecEnv {
   RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
+  PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -131,6 +133,7 @@ class ExecEnv {
   boost::scoped_ptr<DiskIoMgr> disk_io_mgr_;
   boost::scoped_ptr<Webserver> webserver_;
   boost::scoped_ptr<MemTracker> mem_tracker_;
+  boost::scoped_ptr<PoolMemTrackerRegistry> pool_mem_trackers_;
   boost::scoped_ptr<ThreadResourceMgr> thread_mgr_;
   boost::scoped_ptr<HdfsOpThreadPool> hdfs_op_thread_pool_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index dc855da..f4073b5 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -38,9 +38,6 @@ using namespace strings;
 namespace impala {
 
 const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
-MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_;
-MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_;
-SpinLock MemTracker::static_mem_trackers_lock_;
 
 AtomicInt64 MemTracker::released_memory_since_gc_;
 
@@ -55,7 +52,6 @@ MemTracker::MemTracker(
     consumption_(&local_counter_),
     local_counter_(TUnit::BYTES),
     consumption_metric_(NULL),
-    auto_unregister_(false),
     log_usage_if_zero_(log_usage_if_zero),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
@@ -73,7 +69,6 @@ MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
     consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
     local_counter_(TUnit::BYTES),
     consumption_metric_(NULL),
-    auto_unregister_(false),
     log_usage_if_zero_(true),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
@@ -91,7 +86,6 @@ MemTracker::MemTracker(
     consumption_(&local_counter_),
     local_counter_(TUnit::BYTES),
     consumption_metric_(consumption_metric),
-    auto_unregister_(false),
     log_usage_if_zero_(true),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
@@ -133,7 +127,7 @@ void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& co
 int64_t MemTracker::GetPoolMemReserved() const {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
-  DCHECK_EQ(limit_, -1);
+  DCHECK_EQ(limit_, -1) << LogUsage("");
 
   int64_t mem_reserved = 0L;
   lock_guard<mutex> l(child_trackers_lock_);
@@ -144,40 +138,42 @@ int64_t MemTracker::GetPoolMemReserved() const {
       // Make sure we don't overflow if the query limits are set to ridiculous values.
       mem_reserved += std::min(child_limit, MemInfo::physical_mem());
     } else {
-      DCHECK_EQ(child_limit, -1);
+      DCHECK_EQ(child_limit, -1) << (*it)->LogUsage("");
       mem_reserved += (*it)->consumption();
     }
   }
   return mem_reserved;
 }
 
-MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name,
-    MemTracker* parent) {
+MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker(
+    const string& pool_name, bool create_if_not_present) {
   DCHECK(!pool_name.empty());
-  lock_guard<SpinLock> l(static_mem_trackers_lock_);
+  lock_guard<SpinLock> l(pool_to_mem_trackers_lock_);
   PoolTrackersMap::iterator it = pool_to_mem_trackers_.find(pool_name);
   if (it != pool_to_mem_trackers_.end()) {
     MemTracker* tracker = it->second;
     DCHECK(pool_name == tracker->pool_name_);
     return tracker;
-  } else {
-    if (parent == NULL) return NULL;
-    // First time this pool_name registered, make a new object.
-    MemTracker* tracker = new MemTracker(
-        -1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), parent);
-    tracker->auto_unregister_ = true;
-    tracker->pool_name_ = pool_name;
-    pool_to_mem_trackers_[pool_name] = tracker;
-    return tracker;
   }
+  if (!create_if_not_present) return nullptr;
+  // First time this pool_name registered, make a new object.
+  MemTracker* tracker =
+      new MemTracker(-1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
+          ExecEnv::GetInstance()->process_mem_tracker());
+  tracker->pool_name_ = pool_name;
+  pool_to_mem_trackers_[pool_name] = tracker;
+  return tracker;
 }
 
-shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
-    const TUniqueId& id, int64_t byte_limit, MemTracker* parent) {
+MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id,
+    const TQueryOptions& query_options, const string& pool_name, ObjectPool* obj_pool) {
+  int64_t byte_limit = -1;
+  if (query_options.__isset.mem_limit && query_options.mem_limit > 0) {
+    byte_limit = query_options.mem_limit;
+  }
   if (byte_limit != -1) {
     if (byte_limit > MemInfo::physical_mem()) {
-      LOG(WARNING) << "Memory limit "
-                   << PrettyPrinter::Print(byte_limit, TUnit::BYTES)
+      LOG(WARNING) << "Memory limit " << PrettyPrinter::Print(byte_limit, TUnit::BYTES)
                    << " exceeds physical memory of "
                    << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
     }
@@ -185,37 +181,19 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
                << PrettyPrinter::Print(byte_limit, TUnit::BYTES);
   }
 
-  lock_guard<SpinLock> l(static_mem_trackers_lock_);
-  RequestTrackersMap::iterator it = request_to_mem_trackers_.find(id);
-  if (it != request_to_mem_trackers_.end()) {
-    // Return the existing MemTracker object for this id, converting the weak ptr
-    // to a shared ptr.
-    shared_ptr<MemTracker> tracker = it->second.lock();
-    DCHECK_EQ(tracker->limit_, byte_limit);
-    DCHECK(id == tracker->query_id_);
-    DCHECK(parent == tracker->parent_);
-    return tracker;
-  } else {
-    // First time this id registered, make a new object.  Give a shared ptr to
-    // the caller and put a weak ptr in the map.
-    shared_ptr<MemTracker> tracker = make_shared<MemTracker>(
-        byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
-    tracker->auto_unregister_ = true;
-    tracker->query_id_ = id;
-    request_to_mem_trackers_[id] = tracker;
-    return tracker;
-  }
+  MemTracker* pool_tracker =
+      ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(
+          pool_name, true);
+  MemTracker* tracker = obj_pool->Add(new MemTracker(
+      byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), pool_tracker));
+  tracker->query_id_ = id;
+  return tracker;
 }
 
 MemTracker::~MemTracker() {
-  DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n" << GetStackTrace();
-  lock_guard<SpinLock> l(static_mem_trackers_lock_);
-  if (auto_unregister_) UnregisterFromParent();
-  // Erase the weak ptr reference from the map.
-  request_to_mem_trackers_.erase(query_id_);
-  // Per-pool trackers should live the entire lifetime of the impalad process, but
-  // remove the element from the map in case this changes in the future.
-  pool_to_mem_trackers_.erase(pool_name_);
+  DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n"
+                                              << GetStackTrace() << "\n"
+                                              << LogUsage("");
 }
 
 void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 2d35c43..8fd40b5 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -21,6 +21,7 @@
 
 #include <stdint.h>
 #include <map>
+#include <memory>
 #include <vector>
 #include <boost/thread/mutex.hpp>
 #include <boost/unordered_map.hpp>
@@ -37,13 +38,20 @@
 
 namespace impala {
 
-class ReservationTrackerCounters;
+class ObjectPool;
 class MemTracker;
+class ReservationTrackerCounters;
+class TQueryOptions;
 
 /// A MemTracker tracks memory consumption; it contains an optional limit
 /// and can be arranged into a tree structure such that the consumption tracked
 /// by a MemTracker is also tracked by its ancestors.
-//
+///
+/// We use a five-level hierarchy of mem trackers: process, pool, query, fragment
+/// instance. Specific parts of the fragment (exec nodes, sinks, etc) will add a
+/// fifth level when they are initialized. This function also initializes a user
+/// function mem tracker (in the fifth level).
+///
 /// By default, memory consumption is tracked via calls to Consume()/Release(), either to
 /// the tracker itself or to one of its descendents. Alternatively, a consumption metric
 /// can specified, and then the metric's value is used as the consumption rather than the
@@ -87,26 +95,13 @@ class MemTracker {
   /// The counters should be owned by the fragment's RuntimeProfile.
   void EnableReservationReporting(const ReservationTrackerCounters& counters);
 
-  /// Returns a MemTracker object for query 'id'.  Calling this with the same id will
-  /// return the same MemTracker object.  An example of how this is used is to pass it
-  /// the same query id for all fragments of that query running on this machine.  This
-  /// way, we have per-query limits rather than per-fragment.
-  /// The first time this is called for an id, a new MemTracker object is created with
-  /// 'parent' as the parent tracker.
-  /// byte_limit and parent must be the same for all GetMemTracker() calls with the
-  /// same id.
-  static std::shared_ptr<MemTracker> GetQueryMemTracker(
-      const TUniqueId& id, int64_t byte_limit, MemTracker* parent);
-
-  /// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same
-  /// 'pool_name' will return the same MemTracker object. This is used to track the local
-  /// memory usage of all requests executing in this pool. The first time this is called
-  /// for a pool, a new MemTracker object is created with the parent tracker if it is not
-  /// NULL. If the parent is NULL, no new tracker will be created and NULL is returned.
-  /// There is no explicit per-pool byte_limit set at any particular impalad, so newly
-  /// created trackers will always have a limit of -1.
-  static MemTracker* GetRequestPoolMemTracker(const std::string& pool_name,
-      MemTracker* parent);
+  /// Construct a MemTracker object for query 'id'. The query limits are determined based
+  /// on 'query_options'. The MemTracker is a child of the request pool MemTracker for
+  /// 'pool_name', which is created if needed. The returned MemTracker is owned by
+  /// 'obj_pool'.
+  static MemTracker* CreateQueryMemTracker(const TUniqueId& id,
+      const TQueryOptions& query_options, const std::string& pool_name,
+      ObjectPool* obj_pool);
 
   /// Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes) {
@@ -324,9 +319,12 @@ class MemTracker {
   static const std::string COUNTER_NAME;
 
  private:
+  friend class PoolMemTrackerRegistry;
+
   bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
 
-  /// If consumption is higher than max_consumption, attempts to free memory by calling any
+  /// If consumption is higher than max_consumption, attempts to free memory by calling
+  /// any
   /// added GC functions.  Returns true if max_consumption is still exceeded. Takes
   /// gc_lock. Updates metrics if initialized.
   bool GcMemory(int64_t max_consumption);
@@ -361,25 +359,7 @@ class MemTracker {
   /// Lock to protect GcMemory(). This prevents many GCs from occurring at once.
   boost::mutex gc_lock_;
 
-  /// Protects request_to_mem_trackers_ and pool_to_mem_trackers_.
-  /// IMPALA-3068: Use SpinLock instead of boost::mutex so that it won't automatically
-  /// destroy itself as part of process teardown, which could cause races.
-  static SpinLock static_mem_trackers_lock_;
-
-  /// All per-request MemTracker objects that are in use.  For memory management, this map
-  /// contains only weak ptrs.  MemTrackers that are handed out via GetQueryMemTracker()
-  /// are shared ptrs.  When all the shared ptrs are no longer referenced, the MemTracker
-  /// d'tor will be called at which point the weak ptr will be removed from the map.
-  typedef boost::unordered_map<TUniqueId, std::weak_ptr<MemTracker>>
-  RequestTrackersMap;
-  static RequestTrackersMap request_to_mem_trackers_;
-
-  /// All per-request pool MemTracker objects. It is assumed that request pools will live
-  /// for the entire duration of the process lifetime.
-  typedef boost::unordered_map<std::string, MemTracker*> PoolTrackersMap;
-  static PoolTrackersMap pool_to_mem_trackers_;
-
-  /// Only valid for MemTrackers returned from GetQueryMemTracker()
+  /// Only valid for MemTrackers returned from CreateQueryMemTracker()
   TUniqueId query_id_;
 
   /// Only valid for MemTrackers returned from GetRequestPoolMemTracker()
@@ -424,13 +404,6 @@ class MemTracker {
   /// Functions to call after the limit is reached to free memory.
   std::vector<GcFunction> gc_functions_;
 
-  /// If true, calls UnregisterFromParent() in the dtor. This is only used for
-  /// the query wide trackers to remove it from the process mem tracker. The
-  /// process tracker never gets deleted so it is safe to reference it in the dtor.
-  /// The query tracker has lifetime shared by multiple plan fragments so it's hard
-  /// to do cleanup another way.
-  bool auto_unregister_;
-
   /// If false, this tracker (and its children) will not be included in LogUsage() output
   /// if consumption is 0.
   bool log_usage_if_zero_;
@@ -451,6 +424,29 @@ class MemTracker {
   IntGauge* limit_metric_;
 };
 
+/// Global registry for query and pool MemTrackers. Owned by ExecEnv.
+class PoolMemTrackerRegistry {
+ public:
+  /// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same
+  /// 'pool_name' will return the same MemTracker object. This is used to track the local
+  /// memory usage of all requests executing in this pool. If 'create_if_not_present' is
+  /// true, the first time this is called for a pool, a new MemTracker object is created
+  /// with the process tracker as its parent. There is no explicit per-pool byte_limit
+  /// set at any particular impalad, so newly created trackers will always have a limit
+  /// of -1.
+  MemTracker* GetRequestPoolMemTracker(
+      const std::string& pool_name, bool create_if_not_present);
+
+ private:
+  /// All per-request pool MemTracker objects. It is assumed that request pools will live
+  /// for the entire duration of the process lifetime so MemTrackers are never removed
+  /// from this map. Protected by 'pool_to_mem_trackers_lock_'
+  typedef boost::unordered_map<std::string, MemTracker*> PoolTrackersMap;
+  PoolTrackersMap pool_to_mem_trackers_;
+  /// IMPALA-3068: Use SpinLock instead of boost::mutex so that the lock won't
+  /// automatically destroy itself as part of process teardown, which could cause races.
+  SpinLock pool_to_mem_trackers_lock_;
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 464dbce..dbadf6d 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -147,7 +147,6 @@ Status PlanFragmentExecutor::PrepareInternal(
   }
 
   DCHECK(!instance_ctx.request_pool.empty());
-  runtime_state_->InitMemTrackers(&instance_ctx.request_pool, bytes_limit);
   RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
   runtime_state_->InitFilterBank();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 2fd2f8f..5990a24 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -56,32 +56,13 @@ Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
     return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
   }
 
-  QueryState* qs = nullptr;
-  int refcnt;
-  {
-    lock_guard<mutex> l(qs_map_lock_);
-    TUniqueId query_id = params.query_ctx.query_id;
-    auto it = qs_map_.find(query_id);
-    if (it == qs_map_.end()) {
-      // register new QueryState
-      qs = new QueryState(params.query_ctx);
-      qs_map_.insert(make_pair(query_id, qs));
-      VLOG_QUERY << "new QueryState: query_id=" << query_id;
-    } else {
-      qs = it->second;
-    }
-    // decremented at the end of ExecFInstance()
-    refcnt = qs->refcnt_.Add(1);
-  }
-  DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
-  VLOG_QUERY << "QueryState: query_id=" << params.query_ctx.query_id
-             << " refcnt=" << refcnt;
-
+  bool dummy;
+  QueryState* qs = GetOrCreateQueryState(
+      params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy);
   DCHECK(params.__isset.fragment_ctx);
   DCHECK(params.__isset.fragment_instance_ctx);
-  FragmentInstanceState* fis = qs->obj_pool()->Add(
-      new FragmentInstanceState(qs, params.fragment_ctx, params.fragment_instance_ctx,
-        params.query_ctx.desc_tbl));
+  FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
+      qs, params.fragment_ctx, params.fragment_instance_ctx, params.query_ctx.desc_tbl));
   // register instance before returning so that async Cancel() calls can
   // find the instance
   qs->RegisterFInstance(fis);
@@ -96,6 +77,39 @@ Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
   return Status::OK();
 }
 
+QueryState* QueryExecMgr::CreateQueryState(
+    const TQueryCtx& query_ctx, const string& request_pool) {
+  bool created;
+  QueryState* qs = GetOrCreateQueryState(query_ctx, request_pool, &created);
+  DCHECK(created);
+  return qs;
+}
+
+QueryState* QueryExecMgr::GetOrCreateQueryState(
+    const TQueryCtx& query_ctx, const string& request_pool, bool* created) {
+  QueryState* qs = nullptr;
+  int refcnt;
+  {
+    lock_guard<mutex> l(qs_map_lock_);
+    auto it = qs_map_.find(query_ctx.query_id);
+    if (it == qs_map_.end()) {
+      // register new QueryState
+      qs = new QueryState(query_ctx, request_pool);
+      qs_map_.insert(make_pair(query_ctx.query_id, qs));
+      VLOG_QUERY << "new QueryState: query_id=" << query_ctx.query_id;
+      *created = true;
+    } else {
+      qs = it->second;
+      *created = false;
+    }
+    // decremented at the end of ExecFInstance()
+    refcnt = qs->refcnt_.Add(1);
+  }
+  DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
+  VLOG_QUERY << "QueryState: query_id=" << query_ctx.query_id << " refcnt=" << refcnt;
+  return qs;
+}
+
 void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
   fis->Exec();
 
@@ -158,6 +172,6 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
     qs_map_.erase(it);
   }
   // TODO: send final status report during gc, but do this from a different thread
+  qs_from_map->ReleaseResources();
   delete qs_from_map;
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 7b3fb84..37abb85 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -31,6 +31,7 @@ namespace impala {
 class QueryState;
 class Thread;
 class TExecPlanFragmentParams;
+class TQueryCtx;
 class TUniqueId;
 class FragmentInstanceState;
 
@@ -55,6 +56,12 @@ class QueryExecMgr {
   /// return value of this function.
   Status StartFInstance(const TExecPlanFragmentParams& params);
 
+  /// Creates and the QueryState for the given query with the provided parameters. Only
+  /// valid to call if the QueryState does not already exist. The caller must call
+  /// ReleaseQueryState() with the returned QueryState to decrement the refcount.
+  QueryState* CreateQueryState(
+      const TQueryCtx& query_ctx, const std::string& request_pool);
+
   /// If a QueryState for the given query exists, increments that refcount and returns
   /// the QueryState, otherwise returns nullptr.
   QueryState* GetQueryState(const TUniqueId& query_id);
@@ -69,10 +76,14 @@ class QueryExecMgr {
   /// map from query id to QueryState (owned by us)
   std::unordered_map<TUniqueId, QueryState*> qs_map_;
 
+  /// Gets the existing QueryState or creates a new one if not present.
+  /// 'created' is set to true if it was created, false otherwise.
+  QueryState* GetOrCreateQueryState(
+      const TQueryCtx& query_ctx, const std::string& request_pool, bool* created);
+
   /// Execute instance.
   void ExecFInstance(FragmentInstanceState* fis);
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index def95c0..7cc3396 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -23,6 +23,7 @@
 
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 
 using namespace impala;
@@ -37,9 +38,8 @@ QueryState::ScopedRef::~ScopedRef() {
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
 }
 
-QueryState::QueryState(const TQueryCtx& query_ctx)
-  : query_ctx_(query_ctx),
-    refcnt_(0) {
+QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
+  : query_ctx_(query_ctx), refcnt_(0), released_resources_(false) {
   TQueryOptions& query_options = query_ctx_.client_request.query_options;
   // max_errors does not indicate how many errors in total have been recorded, but rather
   // how many are distinct. It is defined as the sum of the number of generic errors and
@@ -50,6 +50,28 @@ QueryState::QueryState(const TQueryCtx& query_ctx)
   if (query_options.batch_size <= 0) {
     query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
   }
+  InitMemTrackers(pool);
+}
+
+void QueryState::ReleaseResources() {
+  // Avoid dangling reference from the parent of 'query_mem_tracker_'.
+  query_mem_tracker_->UnregisterFromParent();
+  released_resources_ = true;
+}
+
+QueryState::~QueryState() {
+  DCHECK(released_resources_);
+}
+
+void QueryState::InitMemTrackers(const std::string& pool) {
+  int64_t bytes_limit = -1;
+  if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
+    bytes_limit = query_options().mem_limit;
+    VLOG_QUERY << "Using query memory limit from query options: "
+               << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
+  }
+  query_mem_tracker_ =
+      MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
 }
 
 void QueryState::RegisterFInstance(FragmentInstanceState* fis) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index fe9cca2..685e82a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -15,22 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_RUNTIME_QUERY_STATE_H
 #define IMPALA_RUNTIME_QUERY_STATE_H
 
-#include <boost/thread/mutex.hpp>
+#include <memory>
 #include <unordered_map>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "util/uid-util.h"
-#include "common/atomic.h"
 
 namespace impala {
 
 class FragmentInstanceState;
+class MemTracker;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -61,6 +63,8 @@ class QueryState {
   /// }
   class ScopedRef {
    public:
+    /// Looks up the query state with GetQueryState(). The query state is non-NULL if
+    /// the query was already registered.
     ScopedRef(const TUniqueId& query_id);
     ~ScopedRef();
 
@@ -85,6 +89,12 @@ class QueryState {
 
   const TUniqueId& query_id() const { return query_ctx_.query_id; }
 
+  const TQueryOptions& query_options() const {
+    return query_ctx_.client_request.query_options;
+  }
+
+  MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
+
   /// Registers a new FInstanceState.
   void RegisterFInstance(FragmentInstanceState* fis);
 
@@ -92,9 +102,14 @@ class QueryState {
   /// been registered. The returned FIS is valid for the duration of the QueryState.
   FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
 
+  /// Called once the query is complete to release any resources.
+  /// Must be called before destroying the QueryState.
+  void ReleaseResources();
+
+  ~QueryState();
+
  private:
   friend class QueryExecMgr;
-  friend class TestEnv;
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
@@ -103,15 +118,24 @@ class QueryState {
   ObjectPool obj_pool_;
   AtomicInt32 refcnt_;
 
-  boost::mutex fis_map_lock_;  // protects fis_map_
+  /// True if and only if ReleaseResources() has been called.
+  bool released_resources_;
+
+  boost::mutex fis_map_lock_; // protects fis_map_
 
   /// map from instance id to its state (owned by obj_pool_)
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
 
+  /// The top-level MemTracker for this query (owned by obj_pool_).
+  MemTracker* query_mem_tracker_;
+
   /// Create QueryState w/ copy of query_ctx and refcnt of 0.
-  QueryState(const TQueryCtx& query_ctx);
-};
+  /// The query is associated with the resource pool named 'pool'
+  QueryState(const TQueryCtx& query_ctx, const std::string& pool);
 
+  /// Called from constructor to initialize MemTrackers.
+  void InitMemTrackers(const std::string& pool);
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5be4f1c..de0fc97 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -71,53 +71,42 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
 
 namespace impala {
 
-RuntimeState::RuntimeState(
-    QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
   : desc_tbl_(nullptr),
     obj_pool_(new ObjectPool()),
     query_state_(query_state),
     fragment_ctx_(&fragment_ctx),
     instance_ctx_(&instance_ctx),
-    now_(new TimestampValue(
-        query_state->query_ctx().now_string.c_str(),
+    now_(new TimestampValue(query_state->query_ctx().now_string.c_str(),
         query_state->query_ctx().now_string.size())),
     exec_env_(exec_env),
     profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
+    query_mem_tracker_(query_state_->query_mem_tracker()),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
 }
 
-RuntimeState::RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env)
+RuntimeState::RuntimeState(
+    const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool)
   : obj_pool_(new ObjectPool()),
     query_state_(nullptr),
     fragment_ctx_(nullptr),
     instance_ctx_(nullptr),
     local_query_ctx_(query_ctx),
     now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())),
-    exec_env_(exec_env == nullptr ? ExecEnv::GetInstance() : exec_env),
+    exec_env_(exec_env),
     profile_(obj_pool_.get(), "<unnamed>"),
+    query_mem_tracker_(MemTracker::CreateQueryMemTracker(
+        query_id(), query_options(), request_pool, obj_pool_.get())),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
 }
 
 RuntimeState::~RuntimeState() {
-  block_mgr_.reset();
-
-  // Release codegen memory before tearing down trackers.
-  codegen_.reset();
-
-  // query_mem_tracker_ must be valid as long as instance_mem_tracker_ is so
-  // delete instance_mem_tracker_ first.
-  // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded.
-  // Break the link between the instance_mem_tracker and its parent (query_mem_tracker_)
-  // before the instance_mem_tracker_ and its children are destroyed.
-  // May be NULL if InitMemTrackers() is not called, for example from tests.
-  if (instance_mem_tracker_ != NULL) instance_mem_tracker_->UnregisterFromParent();
-  instance_mem_tracker_.reset();
-  query_mem_tracker_.reset();
+  DCHECK(instance_mem_tracker_ == nullptr) << "Must call ReleaseResources()";
 }
 
 void RuntimeState::Init() {
@@ -133,18 +122,9 @@ void RuntimeState::Init() {
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
   total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime");
   total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
-}
 
-void RuntimeState::InitMemTrackers(const string* pool_name, int64_t query_bytes_limit) {
-  MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
-  if (pool_name != NULL) {
-    query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
-        query_parent_tracker);
-  }
-  query_mem_tracker_ =
-      MemTracker::GetQueryMemTracker(query_id(), query_bytes_limit, query_parent_tracker);
   instance_mem_tracker_.reset(new MemTracker(
-      runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get()));
+      runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_));
 }
 
 void RuntimeState::InitFilterBank() {
@@ -236,10 +216,10 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
   return Status::OK();
 }
 
-void RuntimeState::LogMemLimitExceeded(const MemTracker* tracker,
-    int64_t failed_allocation_size) {
+void RuntimeState::LogMemLimitExceeded(
+    const MemTracker* tracker, int64_t failed_allocation_size) {
   DCHECK_GE(failed_allocation_size, 0);
-  DCHECK(query_mem_tracker_.get() != NULL);
+  DCHECK(query_mem_tracker_ != NULL);
   stringstream ss;
   ss << "Memory Limit Exceeded by fragment: " << fragment_instance_id() << endl;
   if (failed_allocation_size != 0) {
@@ -281,7 +261,10 @@ Status RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
 }
 
 Status RuntimeState::CheckQueryState() {
-  if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) return SetMemLimitExceeded();
+  if (instance_mem_tracker_ != nullptr
+      && UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) {
+    return SetMemLimitExceeded();
+  }
   return GetQueryStatus();
 }
 
@@ -305,6 +288,20 @@ void RuntimeState::ReleaseResources() {
   if (resource_pool_ != nullptr) {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
   }
+  block_mgr_.reset(); // Release any block mgr memory, if this is the last reference.
+  codegen_.reset(); // Release any memory associated with codegen.
+
+  // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so
+  // delete 'instance_mem_tracker_' first.
+  // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so
+  // break the link between 'instance_mem_tracker_' and its parent before
+  // 'instance_mem_tracker_' and its children are destroyed.
+  instance_mem_tracker_->UnregisterFromParent();
+  instance_mem_tracker_.reset();
+
+  // If this RuntimeState owns 'query_mem_tracker_' it must deregister it.
+  if (query_state_ == nullptr) query_mem_tracker_->UnregisterFromParent();
+  query_mem_tracker_ = nullptr;
 }
 
 const std::string& RuntimeState::GetEffectiveUser() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b56097a..14f9d38 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -81,20 +81,13 @@ class RuntimeState {
       const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env);
 
   /// RuntimeState for executing expr in fe-support.
-  RuntimeState(const TQueryCtx& query_ctx, ExecEnv* exec_env = nullptr);
+  RuntimeState(
+      const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool);
 
   /// Empty d'tor to avoid issues with scoped_ptr.
   ~RuntimeState();
 
-  /// Set up five-level hierarchy of mem trackers: process, pool, query, fragment
-  /// instance. The instance tracker is tied to our profile. Specific parts of the
-  /// fragment (i.e. exec nodes, sinks, data stream senders, etc) will add a fifth level
-  /// when they are initialized. This function also initializes a user function mem
-  /// tracker (in the fifth level). If 'request_pool' is null, no request pool mem
-  /// tracker is set up, i.e. query pools will have the process mem pool as the parent.
-  void InitMemTrackers(const std::string* request_pool, int64_t query_bytes_limit);
-
-  /// Initializes the runtime filter bank. Must be called after InitMemTrackers().
+  /// Initializes the runtime filter bank.
   void InitFilterBank();
 
   /// Gets/Creates the query wide block mgr.
@@ -133,7 +126,7 @@ class RuntimeState {
   CatalogServiceClientCache* catalogd_client_cache();
   DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
-  MemTracker* query_mem_tracker() { return query_mem_tracker_.get(); }
+  MemTracker* query_mem_tracker() { return query_mem_tracker_; }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
@@ -299,7 +292,8 @@ class RuntimeState {
 
   /// Returns a non-OK status if query execution should stop (e.g., the query was
   /// cancelled or a mem limit was exceeded). Exec nodes should check this periodically so
-  /// execution doesn't continue if the query terminates abnormally.
+  /// execution doesn't continue if the query terminates abnormally. This can be called
+  /// after ReleaseResources().
   Status CheckQueryState();
 
   /// Create a codegen object accessible via codegen() if it doesn't exist already.
@@ -383,11 +377,11 @@ class RuntimeState {
   /// Total CPU utilization for all threads in this plan fragment.
   RuntimeProfile::ThreadCounters* total_thread_statistics_;
 
-  /// MemTracker that is shared by all fragment instances running on this host.
-  /// The query mem tracker must be released after the instance_mem_tracker_.
-  std::shared_ptr<MemTracker> query_mem_tracker_;
+  /// Reference to the query MemTracker, owned by 'query_state_' if that is non-NULL
+  /// or stored in 'obj_pool_' otherwise.
+  MemTracker* query_mem_tracker_;
 
-  /// Memory usage of this fragment instance
+  /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
   /// if true, execution should stop with a CANCELLED status

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index a1ac3c4..4f9e8f5 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "runtime/test-env.h"
+
+#include "runtime/query-exec-mgr.h"
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
 
@@ -61,16 +63,20 @@ void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_de
 
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
-  TearDownRuntimeStates();
+  TearDownQueries();
   exec_env_.reset();
   io_mgr_tracker_.reset();
   tmp_file_mgr_.reset();
   metrics_.reset();
 }
 
-void TestEnv::TearDownRuntimeStates() {
-  for (auto& runtime_state : runtime_states_) runtime_state.second->ReleaseResources();
+void TestEnv::TearDownQueries() {
+  for (RuntimeState* runtime_state : runtime_states_) runtime_state->ReleaseResources();
   runtime_states_.clear();
+  for (QueryState* query_state : query_states_) {
+    exec_env_->query_exec_mgr()->ReleaseQueryState(query_state);
+  }
+  query_states_.clear();
 }
 
 int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
@@ -81,34 +87,35 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
 
 int64_t TestEnv::TotalQueryMemoryConsumption() {
   int64_t total = 0;
-  for (const auto& runtime_state : runtime_states_) {
-    total += runtime_state.second->query_mem_tracker()->consumption();
+  for (QueryState* query_state : query_states_) {
+    total += query_state->query_mem_tracker()->consumption();
   }
   return total;
 }
 
 Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
     const TQueryOptions* query_options, RuntimeState** runtime_state) {
-  // Enforce invariant that each query ID can be registered at most once.
-  if (runtime_states_.find(query_id) != runtime_states_.end()) {
-    return Status(Substitute("Duplicate query id found: $0", query_id));
-  }
-
   TQueryCtx query_ctx;
   if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
-  *runtime_state = new RuntimeState(query_ctx, exec_env_.get());
-  (*runtime_state)->InitMemTrackers(nullptr, -1);
+
+  // CreateQueryState() enforces the invariant that 'query_id' must be unique.
+  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, "test-pool");
+  query_states_.push_back(qs);
+  FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
+      qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable()));
+  RuntimeState* rs = qs->obj_pool()->Add(
+      new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
+  runtime_states_.push_back(rs);
 
   shared_ptr<BufferedBlockMgr> mgr;
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
-      (*runtime_state)->query_mem_tracker(), (*runtime_state)->runtime_profile(),
-      tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
-  (*runtime_state)->set_block_mgr(mgr);
+  RETURN_IF_ERROR(BufferedBlockMgr::Create(rs, qs->query_mem_tracker(),
+      rs->runtime_profile(), tmp_file_mgr_.get(),
+      CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
+  rs->set_block_mgr(mgr);
 
-  runtime_states_[query_id] = shared_ptr<RuntimeState>(*runtime_state);
+  if (runtime_state != nullptr) *runtime_state = rs;
   return Status::OK();
 }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index e648977..3f2eaec 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,17 +18,19 @@
 #ifndef IMPALA_RUNTIME_TEST_ENV
 #define IMPALA_RUNTIME_TEST_ENV
 
-#include "runtime/disk-io-mgr.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/runtime-state.h"
 #include "runtime/query-state.h"
+#include "runtime/runtime-state.h"
 
 namespace impala {
 
 /// Helper testing class that creates an environment with a buffered-block-mgr similar
-/// to the one Impala's runtime is using.
+/// to the one Impala's runtime is using. Only one TestEnv can be active at a time,
+/// because it replaces the global ExecEnv singleton.
 class TestEnv {
  public:
   TestEnv();
@@ -38,14 +40,17 @@ class TestEnv {
   /// query states have been created.
   void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
 
-  /// Create a RuntimeState for a query with a new block manager and the given query
-  /// options. The RuntimeState is owned by the TestEnv. Returns an error if
+  /// Create a QueryState and a RuntimeState for a query with a new block manager and
+  /// the given query options. The states are owned by the TestEnv. Returns an error if
   /// CreateQueryState() has been called with the same query ID already.
+  /// If non-null, 'runtime_state' are set to the newly created RuntimeState. The
+  /// QueryState can be obtained via 'runtime_state'.
   Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
       const TQueryOptions* query_options, RuntimeState** runtime_state);
 
-  /// Destroy all RuntimeStates and block managers created by this TestEnv.
-  void TearDownRuntimeStates();
+  /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs,
+  /// etc, that were created since the last TearDownQueries() call.
+  void TearDownQueries();
 
   /// Calculate memory limit accounting for overflow and negative values.
   /// If max_buffers is -1, no memory limit will apply.
@@ -70,11 +75,13 @@ class TestEnv {
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
 
-  /// Per-query states with associated block managers. Key is the integer query ID passed
-  /// to CreatePerQueryState().
-  std::unordered_map<int64_t, std::shared_ptr<RuntimeState>> runtime_states_;
-};
+  /// Per-query states. TestEnv holds 1 refcount per QueryState in this map.
+  std::vector<QueryState*> query_states_;
 
+  /// One runtime state per query with an associated block manager. Each is owned
+  /// by one of the 'query_states_'.
+  std::vector<RuntimeState*> runtime_states_;
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 791c99c..d34eb42 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -407,7 +407,7 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
     }
   }
   file_group.Close();
-  test_env_->TearDownRuntimeStates();
+  test_env_->TearDownQueries();
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index a5d8457..cfea03c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -701,7 +701,8 @@ void AdmissionController::UpdateClusterAggregates() {
 void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   // May be NULL if no queries have ever executed in this pool on this node but another
   // node sent stats for this pool.
-  const MemTracker* tracker = MemTracker::GetRequestPoolMemTracker(name_, NULL);
+  const MemTracker* tracker =
+      ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false);
 
   const int64_t current_reserved =
       tracker == NULL ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 0cc9761..f02483b 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -96,21 +96,16 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   // Allow logging of at least one error, so we can detect and convert it into a
   // Java exception.
   query_ctx.client_request.query_options.max_errors = 1;
-  RuntimeState state(query_ctx);
+
+  // Track memory against a dummy "fe-eval-exprs" resource pool - we don't
+  // know what resource pool the query has been assigned to yet.
+  RuntimeState state(query_ctx, ExecEnv::GetInstance(), "fe-eval-exprs");
   // Make sure to close the runtime state no matter how this scope is exited.
   const auto close_runtime_state =
       MakeScopeExitTrigger([&state]() { state.ReleaseResources(); });
 
-  THROW_IF_ERROR_RET(jni_frame.push(env), env, JniUtil::internal_exc_class(),
-      result_bytes);
-  // Exprs can allocate memory so we need to set up the mem trackers before
-  // preparing/running the exprs.
-  int64_t mem_limit = -1;
-  if (query_ctx.client_request.query_options.__isset.mem_limit
-      && query_ctx.client_request.query_options.mem_limit > 0) {
-    mem_limit = query_ctx.client_request.query_options.mem_limit;
-  }
-  state.InitMemTrackers(NULL, mem_limit);
+  THROW_IF_ERROR_RET(
+      jni_frame.push(env), env, JniUtil::internal_exc_class(), result_bytes);
 
   // Prepare() the exprs. Always Close() the exprs even in case of errors.
   vector<ExprContext*> expr_ctxs;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 2fb52e7..7483dd3 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -24,8 +24,9 @@
 
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "service/impala-server.h"
 #include "service/query-exec-state.h"
 #include "thrift/protocol/TDebugProtocol.h"
@@ -250,20 +251,11 @@ void ImpalaHttpHandler::QueryMemoryHandler(const Webserver::ArgumentMap& args,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
-  shared_ptr<ImpalaServer::QueryExecState> exec_state =
-      server_->GetQueryExecState(unique_id, true);
+  QueryState::ScopedRef qs(unique_id);
   string mem_usage_text;
-  // Search the in-flight queries, since only in-flight queries have a MemTracker
-  if (exec_state != NULL) {
-    lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
-    // Only queries with coordinator have mem_tracker
-    if (exec_state->coord() == NULL) {
-      mem_usage_text =
-          "The query does not have memory tracking information available.";
-    } else {
-      MemTracker* query_mem_tracker = exec_state->coord()->query_mem_tracker();
-      mem_usage_text = query_mem_tracker->LogUsage();
-    }
+  // Only in-flight queries have a MemTracker to get usage from.
+  if (qs.get() != nullptr) {
+    mem_usage_text = qs->query_mem_tracker()->LogUsage();
   } else {
     mem_usage_text =
         "The query is finished, current memory consumption is not available.";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/85edc15f/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 4a9d8f6..fb23e21 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -551,6 +551,10 @@ void ImpalaServer::QueryExecState::Done() {
   summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
   query_events_->MarkEvent("Unregister query");
 
+  // Update result set cache metrics, and update mem limit accounting before tearing
+  // down the coordinator.
+  ClearResultCache();
+
   if (coord_.get() != NULL) {
     // Release any reserved resources.
     Status status = exec_env_->scheduler()->Release(schedule_.get());
@@ -560,9 +564,6 @@ void ImpalaServer::QueryExecState::Done() {
     }
     coord_->TearDown();
   }
-
-  // Update result set cache metrics, and update mem limit accounting.
-  ClearResultCache();
 }
 
 Status ImpalaServer::QueryExecState::Exec(const TMetadataOpRequest& exec_request) {
@@ -1056,5 +1057,4 @@ void ImpalaServer::QueryExecState::ClearResultCache() {
   }
   result_cache_.reset(NULL);
 }
-
 }


Mime
View raw message