quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject [8/8] incubator-quickstep git commit: Check memory availability before admitting a query.
Date Sat, 16 Jul 2016 22:17:09 GMT
Check memory availability before admitting a query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/21be79ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/21be79ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/21be79ba

Branch: refs/heads/memory-estimate
Commit: 21be79bafcf51f0302ea2ab672898c9028392a69
Parents: 8895a7d
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Wed Jul 13 23:19:58 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Sat Jul 16 16:32:21 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  1 +
 query_execution/PriorityPolicyEnforcer.cpp | 51 +++++++++++++++++++------
 query_execution/PriorityPolicyEnforcer.hpp |  4 ++
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21be79ba/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 9ab86b2..11e0e1d 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_WorkerMessage
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_relationaloperators_WorkOrder
+                      quickstep_storage_StorageManager
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21be79ba/query_execution/PriorityPolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp
index 30aa39e..2496b74 100644
--- a/query_execution/PriorityPolicyEnforcer.cpp
+++ b/query_execution/PriorityPolicyEnforcer.cpp
@@ -17,6 +17,7 @@
 
 #include "query_execution/PriorityPolicyEnforcer.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <queue>
@@ -32,6 +33,7 @@
 #include "query_execution/WorkerDirectory.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageManager.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
@@ -59,12 +61,13 @@ PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_clie
       storage_manager_(storage_manager),
       worker_directory_(worker_directory),
       bus_(bus),
-      profile_individual_workorders_(profile_individual_workorders) {
+      profile_individual_workorders_(profile_individual_workorders),
+      committed_memory_(0) {
   learner_.reset(new Learner());
 }
 
 bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
-  if (admitted_queries_.size() < kMaxConcurrentQueries) {
+  if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
{
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();
     if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
@@ -72,13 +75,14 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
       admitted_queries_[query_id].reset(
           new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
                            catalog_database_, storage_manager_, bus_));
-      DLOG(INFO) << "Admitted query with ID: " << query_handle->query_id()
-                 << " priority: " << query_handle->query_priority();
+      std::cout << "Admitted query with ID: " << query_handle->query_id()
+                 << " priority: " << query_handle->query_priority() <<
"\n";
       priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
       learner_->addQuery(*query_handle);
       query_handle->setAdmissionTime();
       query_id_to_handle_[query_handle->query_id()] = query_handle;
       LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate:
" << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
+      LOG(INFO) << "Query " << query_handle->query_id() << " memory
admissible? " << admissionMemoryCheck(query_handle);
       return true;
     } else {
       LOG(ERROR) << "Query with the same ID " << query_id << " exists";
@@ -86,9 +90,12 @@ bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
     }
   } else {
     // This query will have to wait.
-    DLOG(INFO) << "Query " << query_handle->query_id() << " waitlisted";
-    query_id_to_handle_[query_handle->query_id()] = query_handle;
-    waiting_queries_.push(query_handle);
+    std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
+    if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end())
{
+      // This query was not waitlisted earlier.
+      query_id_to_handle_[query_handle->query_id()] = query_handle;
+      waiting_queries_.push(query_handle);
+    }
     return false;
   }
 }
@@ -172,8 +179,11 @@ void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.
       QueryHandle *new_query = waiting_queries_.front();
-      waiting_queries_.pop();
-      admitQuery(new_query);
+      // waiting_queries_.pop();
+      if (admitQuery(new_query)) {
+        std::cout << "Removing Q " << new_query->query_id() << " from
waitlist\n";
+        waiting_queries_.pop();
+      }
     }
   } else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
     learner_->removeOperator(query_id, operator_id);
@@ -276,6 +286,8 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
     priority_query_ids_.erase(query_priority_unsigned);
   }
   query_id_to_handle_[query_id]->setCompletionTime();
+  const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
+  committed_memory_ -= estimated_memory_bytes;
   // Remove the query from the learner.
   learner_->removeQuery(query_id);
   // TODO(harshad) - Admit waiting queries, if any.
@@ -284,12 +296,13 @@ void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id)
{
 
 bool PriorityPolicyEnforcer::admitQueries(
     const std::vector<QueryHandle*> &query_handles) {
+  bool result = true;
   for (QueryHandle *curr_query : query_handles) {
     if (!admitQuery(curr_query)) {
-      return false;
+      result = false;
     }
   }
-  return true;
+  return result;
 }
 
 void PriorityPolicyEnforcer::recordTimeForWorkOrder(
@@ -363,4 +376,20 @@ WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
   return nullptr;
 }
 
+bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
+  if (admitted_queries_.empty()) {
+    // No query running in the system, let the query in.
+    return true;
+  }
+  const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
+  const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
+  const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
+  const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
+  if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots())
{
+    committed_memory_ += estimated_memory_requirement_bytes;
+    return true;
+  }
+  return false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/21be79ba/query_execution/PriorityPolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp
index 702b976..9d727ed 100644
--- a/query_execution/PriorityPolicyEnforcer.hpp
+++ b/query_execution/PriorityPolicyEnforcer.hpp
@@ -191,6 +191,8 @@ class PriorityPolicyEnforcer {
 
   void getWorkerMessagesHPF(std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
 
+  bool admissionMemoryCheck(const QueryHandle *query_handle);
+
   const tmb::client_id foreman_client_id_;
   const std::size_t num_numa_nodes_;
 
@@ -228,6 +230,8 @@ class PriorityPolicyEnforcer {
 
   std::unique_ptr<Learner> learner_;
 
+  std::size_t committed_memory_;
+
   DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer);
 };
 


Mime
View raw message