quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject incubator-quickstep git commit: Foreman controls injection of high priority query.
Date Sat, 16 Jul 2016 18:05:09 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/hpf-policy [created] fa625082c


Foreman controls injection of high priority query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/fa625082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/fa625082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/fa625082

Branch: refs/heads/hpf-policy
Commit: fa625082c7643d0b985cbcdabec41b25e3db1f36
Parents: eea5603
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Sat Jul 16 13:04:32 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Sat Jul 16 13:04:32 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt |  1 +
 query_execution/Foreman.cpp    | 48 +++++++++++++++++++++++++++++++++++--
 query_execution/Foreman.hpp    | 12 ++++++++++
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fa625082/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 9ab86b2..1ccb26d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -79,6 +79,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
 target_link_libraries(quickstep_queryexecution_Foreman
                       ${GFLAGS_LIB_NAME}
                       glog
+                      quickstep_cli_InputParserUtil
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
                       quickstep_queryexecution_PriorityPolicyEnforcer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fa625082/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 40d6e5c..a7027d9 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -24,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include "cli/InputParserUtil.hpp"
 #include "query_execution/AdmitRequestMessage.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -49,6 +50,8 @@ namespace quickstep {
 DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number "
               "of pending work orders for the worker. This information is used "
               "by the Foreman to assign work orders to worker threads");
+DEFINE_string(high_priority_queries_entry_points, "", "A comma separated list of entry points
for high priority queries, each of which is defined in terms of milliseconds since the beginning
of workload execution");
+DEFINE_uint64(num_high_priority_queries, 1, "Number of high priority queries to be admitted
to the system");
 
 Foreman::Foreman(const tmb::client_id main_thread_client_id,
                  WorkerDirectory *worker_directory,
@@ -62,7 +65,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       main_thread_client_id_(main_thread_client_id),
       worker_directory_(DCHECK_NOTNULL(worker_directory)),
       catalog_database_(DCHECK_NOTNULL(catalog_database)),
-      storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+      storage_manager_(DCHECK_NOTNULL(storage_manager)),
+      start_time_(std::chrono::steady_clock::now()),
+      high_priority_queries_injection_points_(InputParserUtil::ParseWorkerAffinities(FLAGS_num_high_priority_queries,
FLAGS_high_priority_queries_entry_points))
+      {
   const std::vector<QueryExecutionMessageType> sender_message_types{
       kPoisonMessage,
       kRebuildWorkOrderMessage,
@@ -95,6 +101,10 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id,
       worker_directory_,
       bus_,
       profile_individual_workorders));
+
+  CHECK(FLAGS_num_high_priority_queries == high_priority_queries_injection_points_.size())
<< "Number of high priority queries should be same as number of entry points";
+
+  high_priority_queries_admitted_.resize(FLAGS_num_high_priority_queries, false);
 }
 
 void Foreman::run() {
@@ -125,6 +135,15 @@ void Foreman::run() {
         const AdmitRequestMessage *msg =
             static_cast<const AdmitRequestMessage *>(tagged_message.message());
         const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
+        vector<QueryHandle*> reduced_query_handles_list;
+        CHECK(query_handles.size() > FLAGS_num_high_priority_queries) << "Number
of high priority queries should be less than total number of queries";
+        for (std::size_t i = 0; i < query_handles.size(); ++i) {
+          if (i < query_handles.size() - FLAGS_num_high_priority_queries) {
+            reduced_query_handles_list.push_back(query_handles[i]);
+          } else {
+            high_priority_query_handles_.push(query_handles[i]);
+          }
+        }
 
         DCHECK(!query_handles.empty());
         bool all_queries_admitted = true;
@@ -132,7 +151,7 @@ void Foreman::run() {
           all_queries_admitted =
               policy_enforcer_->admitQuery(query_handles.front());
         } else {
-          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
+          all_queries_admitted = policy_enforcer_->admitQueries(reduced_query_handles_list);
         }
         if (!all_queries_admitted) {
           LOG(WARNING) << "The scheduler could not admit all the queries";
@@ -157,6 +176,8 @@ void Foreman::run() {
       dispatchWorkerMessages(new_messages);
     }
 
+    checkAndAdmitHighPriorityQueries();
+
     // We check again, as some queries may produce zero work orders and finish
     // their execution.
     if (!policy_enforcer_->hasQueries()) {
@@ -252,4 +273,27 @@ void Foreman::printWorkOrderProfilingResults(const std::size_t query_id,
   }
 }
 
+bool Foreman::checkAndAdmitHighPriorityQueries() {
+  for (std::size_t i = 0; i < high_priority_queries_admitted_.size(); ++i) {
+    if (!high_priority_queries_admitted_[i]) {
+      // Check the timestamp.
+      auto time_in_millis = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now()
- start_time_).count();
+      if (time_in_millis > high_priority_queries_injection_points_[i]) {
+        // Admit the query.
+        QueryHandle *next_query_handle = high_priority_query_handles_.front();
+        high_priority_query_handles_.pop();
+        if (!policy_enforcer_->admitQuery(next_query_handle)) {
+          LOG(INFO) << "Could not admit query with ID: " << next_query_handle->query_id();
+        }
+        high_priority_queries_admitted_[i] = true;
+        return true;
+      } else {
+        // Wait for some more time to admit this query.
+        return false;
+      }
+    }
+  }
+  return false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fa625082/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index c38a3e6..d2db48a 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 
+#include <chrono>
 #include <cstddef>
 #include <cstdio>
 #include <memory>
@@ -121,6 +122,8 @@ class Foreman final : public ForemanLite {
    **/
   bool canCollectNewMessages(const tmb::message_type_id message_type);
 
+  bool checkAndAdmitHighPriorityQueries();
+
   const tmb::client_id main_thread_client_id_;
 
   WorkerDirectory *worker_directory_;
@@ -130,6 +133,15 @@ class Foreman final : public ForemanLite {
 
   std::unique_ptr<PriorityPolicyEnforcer> policy_enforcer_;
 
+  // Start time for Foreman.
+  const std::chrono::steady_clock::time_point start_time_;
+  // Whether high priority queries have been admitted to the system.
+  std::vector<bool> high_priority_queries_admitted_;
+  // Defined in terms of number of milliseconds.
+  const std::vector<int> high_priority_queries_injection_points_;
+  std::queue<QueryHandle*> high_priority_query_handles_;
+
+
   DISALLOW_COPY_AND_ASSIGN(Foreman);
 };
 


Mime
View raw message