impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [10/50] [abbrv] incubator-impala git commit: IMPALA-1169: Admission control info on the queries debug webpage
Date Thu, 17 Nov 2016 16:09:21 GMT
IMPALA-1169: Admission control info on the queries debug webpage

This patch adds a new event, 'Queued', to the query event log to
indicate when a query is queued by the admission controller. This
means that queries on the '/queries' page that are currently
queued will display this as their 'Last Event', making it possible
to see which queries are current queued.

It also adds a column to show the resource pool associated with
the queries, and it updates the wording of the first event that
gets marked for each query from 'Start execution' to 'Query
submitted', since this is before planning and admission control
and therefore execution hasn't actually startd yet.

Change-Id: I504e3c829a14318721e3a42de6281bcc578f7283
Reviewed-on: http://gerrit.cloudera.org:8080/4756
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3be4b3ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3be4b3ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3be4b3ef

Branch: refs/heads/hadoop-next
Commit: 3be4b3efd032a6723042aa2e9b087ff03b021872
Parents: 9bc90fa
Author: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Authored: Mon Oct 17 15:01:51 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Mon Nov 7 23:26:02 2016 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       |  2 +
 be/src/service/impala-http-handler.cc           |  3 ++
 be/src/service/impala-server.cc                 |  3 +-
 be/src/service/impala-server.h                  |  4 ++
 be/src/service/query-exec-state.h               |  7 ++++
 tests/common/impala_service.py                  |  4 ++
 .../custom_cluster/test_admission_controller.py | 40 ++++++++++++++++++++
 www/queries.tmpl                                |  6 +++
 8 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 8151798..a5d8457 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -98,6 +98,7 @@ const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT =
 
 // Profile query events
 const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
+const string QUERY_EVENT_QUEUED = "Queued";
 const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission";
 
 // Profile info strings
@@ -469,6 +470,7 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
       PROFILE_INFO_VAL_QUEUED);
   schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_QUEUE_DETAIL,
       not_admitted_reason);
+  schedule->query_events()->MarkEvent(QUERY_EVENT_QUEUED);
 
   int64_t queue_wait_timeout_ms = FLAGS_queue_wait_timeout_ms;
   if (pool_cfg.__isset.queue_timeout_ms) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 834fd42..b675f3a 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -352,6 +352,9 @@ void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord&
r
   }
   Value val_waiting_time(waiting_time_str.c_str(), document->GetAllocator());
   value->AddMember("waiting_time", val_waiting_time, document->GetAllocator());
+
+  Value resource_pool(record.request_pool.c_str(), document->GetAllocator());
+  value->AddMember("resource_pool", resource_pool, document->GetAllocator());
 }
 
 void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 024cd24..730dfbf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -777,7 +777,7 @@ Status ImpalaServer::ExecuteInternal(
   exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
       this, session_state));
 
-  (*exec_state)->query_events()->MarkEvent("Start execution");
+  (*exec_state)->query_events()->MarkEvent("Query submitted");
 
   TExecRequest result;
   {
@@ -1604,6 +1604,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState&
exec_stat
   }
   all_rows_returned = exec_state.eos();
   last_active_time = exec_state.last_active();
+  request_pool = exec_state.request_pool();
 }
 
 bool ImpalaServer::QueryStateRecordLessThan::operator() (

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 836e619..9d14ecd 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -491,6 +491,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
     // The most recent time this query was actively being processed, in Unix milliseconds.
     int64_t last_active_time;
 
+    /// Request pool to which the request was submitted for admission, or an empty string
+    /// if this request doesn't have a pool.
+    std::string request_pool;
+
     /// Initialise from an exec_state. If copy_profile is true, print the query
     /// profile to a string and copy that into this.profile (which is expensive),
     /// otherwise leave this.profile empty.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
index 26dcf4d..55e3ffa 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -154,6 +154,13 @@ class ImpalaServer::QueryExecState {
   bool eos() const { return eos_; }
   Coordinator* coord() const { return coord_.get(); }
   QuerySchedule* schedule() { return schedule_.get(); }
+
+  /// Resource pool associated with this query, or an empty string if the schedule has not
+  /// been created and had the pool set yet, or this StmtType doesn't go through admission
+  /// control.
+  std::string request_pool() const {
+    return schedule_ == nullptr ? "" : schedule_->request_pool();
+  }
   int num_rows_fetched() const { return num_rows_fetched_; }
   void set_fetched_rows() { fetched_rows_ = true; }
   bool fetched_rows() const { return fetched_rows_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index fe141bf..4e9c815 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -53,6 +53,10 @@ class BaseImpalaService(object):
   def read_debug_webpage(self, page_name, timeout=10, interval=1):
     return self.open_debug_webpage(page_name, timeout=timeout, interval=interval).read()
 
+  def get_debug_webpage_json(self, page_name):
+    """Returns the json for the given Impala debug webpage, eg. '/queries'"""
+    return json.loads(self.read_debug_webpage(page_name + "?json"))
+
   def get_metric_value(self, metric_name, default_value=None):
     """Returns the value of the the given metric name from the Impala debug webpage"""
     metrics = json.loads(self.read_debug_webpage('jsonmetrics?json'))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 136436b..3992e3a 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -17,6 +17,7 @@
 
 # Tests admission control
 
+import itertools
 import logging
 import os
 import pytest
@@ -599,6 +600,33 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         if client is not None:
           client.close()
 
+  def _check_queries_page_resource_pools(self):
+    """Checks that all queries in the '/queries' webpage json have the correct resource
+    pool (this is called after all queries have been admitted, queued, or rejected, so
+    they should already have the pool set), or no pool for queries that don't go through
+    admission control."""
+    for impalad in self.impalads:
+      queries_json = impalad.service.get_debug_webpage_json('/queries')
+      for query in itertools.chain(queries_json['in_flight_queries'], \
+          queries_json['completed_queries']):
+        if query['stmt_type'] == 'QUERY' or query['stmt_type'] == 'DML':
+          assert query['last_event'] != 'Registered' and \
+              query['last_event'] != 'Planning finished'
+          assert query['resource_pool'] == self.pool_name
+        else:
+          assert query['resource_pool'] == ''
+
+  def _get_queries_page_num_queued(self):
+    """Returns the number of queries currently in the 'queued' state from the '/queries'
+    webpage json"""
+    num_queued = 0
+    for impalad in self.impalads:
+      queries_json = impalad.service.get_debug_webpage_json('/queries')
+      for query in queries_json['in_flight_queries']:
+        if query['last_event'] == 'Queued':
+          num_queued += 1
+    return num_queued
+
   def run_admission_test(self, vector, additional_query_options):
     LOG.debug("Starting test case with parameters: %s", vector)
     self.impalads = self.cluster.impalads
@@ -644,6 +672,13 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         num_queries - metric_deltas['admitted'] - metric_deltas['queued']
     initial_metric_deltas = metric_deltas
 
+    # Like above, check that the count from the queries webpage json is reasonable.
+    queries_page_num_queued = self._get_queries_page_num_queued()
+    assert queries_page_num_queued >=\
+        min(num_queries - metric_deltas['admitted'], MAX_NUM_QUEUED_QUERIES)
+    assert queries_page_num_queued <= MAX_NUM_QUEUED_QUERIES * len(self.impalads)
+    self._check_queries_page_resource_pools()
+
     while len(self.executing_threads) > 0:
       curr_metrics = self.get_admission_metrics();
       log_metrics("Main loop, curr_metrics: ", curr_metrics);
@@ -685,6 +720,11 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       assert metric_deltas['queued'] == MAX_NUM_QUEUED_QUERIES
       assert metric_deltas['rejected'] == num_queries - expected_admitted
 
+    # All queries should be completed by now.
+    queries_page_num_queued = self._get_queries_page_num_queued()
+    assert queries_page_num_queued == 0
+    self._check_queries_page_resource_pools()
+
     for thread in self.all_threads:
       if thread.error is not None:
         raise thread.error

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3be4b3ef/www/queries.tmpl
----------------------------------------------------------------------
diff --git a/www/queries.tmpl b/www/queries.tmpl
index cacd147..2f26b92 100644
--- a/www/queries.tmpl
+++ b/www/queries.tmpl
@@ -36,6 +36,7 @@ archived in memory. The size of that archive is controlled with the
     <th>State</th>
     <th>Last Event</th>
     <th># rows fetched</th>
+    <th>Resource Pool</th>
     <th>Details</th>
     <th>Action</th>
   </tr>
@@ -53,6 +54,7 @@ archived in memory. The size of that archive is controlled with the
     <td><samp>{{state}}</samp></td>
     <td><samp>{{last_event}}</samp></td>
     <td>{{rows_fetched}}</td>
+    <td>{{resource_pool}}</td>
     <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     <td><a href='/cancel_query?query_id={{query_id}}'>Cancel</a></td>
   </tr>
@@ -78,6 +80,7 @@ archived in memory. The size of that archive is controlled with the
     <th>State</th>
     <th>Last Event</th>
     <th># rows fetched</th>
+    <th>Resource Pool</th>
     <th>Details</th>
     <th>Action</th>
   </tr>
@@ -96,6 +99,7 @@ archived in memory. The size of that archive is controlled with the
     <td><samp>{{state}}</samp></td>
     <td><samp>{{last_event}}</samp></td>
     <td>{{rows_fetched}}</td>
+    <td>{{resource_pool}}</td>
     <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     <td><a href='/cancel_query?query_id={{query_id}}'>Close</a></td>
   </tr>
@@ -117,6 +121,7 @@ archived in memory. The size of that archive is controlled with the
     <th>Scan Progress</th>
     <th>State</th>
     <th># rows fetched</th>
+    <th>Resource Pool</th>
     <th>Details</th>
   </tr>
 {{#completed_queries}}
@@ -131,6 +136,7 @@ archived in memory. The size of that archive is controlled with the
     <td>{{progress}}</td>
     <td><samp>{{state}}</samp></td>
     <td>{{rows_fetched}}</td>
+    <td>{{resource_pool}}</td>
     <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
   </tr>
 {{/completed_queries}}


Mime
View raw message