Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89679200B45 for ; Thu, 30 Jun 2016 22:49:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 88123160A52; Thu, 30 Jun 2016 20:49:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41926160A63 for ; Thu, 30 Jun 2016 22:49:47 +0200 (CEST) Received: (qmail 69728 invoked by uid 500); 30 Jun 2016 20:49:46 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 69718 invoked by uid 99); 30 Jun 2016 20:49:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jun 2016 20:49:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0AF6A1814E0 for ; Thu, 30 Jun 2016 20:49:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 4VfQmdM-8BrZ for ; Thu, 30 Jun 2016 20:49:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id EBD045FCF7 for ; Thu, 30 Jun 2016 20:49:40 +0000 (UTC) Received: (qmail 68833 invoked by uid 99); 30 Jun 2016 20:49:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jun 2016 20:49:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F274FE00DB; Thu, 30 Jun 2016 20:49:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Thu, 30 Jun 2016 20:49:40 -0000 Message-Id: <2cdbca0bf2444d1ba1d83e4fdb522088@git.apache.org> In-Reply-To: <6aef4e212d1d4672ae9c54a0656a20b1@git.apache.org> References: <6aef4e212d1d4672ae9c54a0656a20b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/18] incubator-quickstep git commit: Created PriorityPolicyEnforcer class. archived-at: Thu, 30 Jun 2016 20:49:48 -0000 Created PriorityPolicyEnforcer class. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/862fd216 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/862fd216 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/862fd216 Branch: refs/heads/scheduler++ Commit: 862fd21628acb0fad59387769382d79d2ad9d253 Parents: c1a44e2 Author: Harshad Deshmukh Authored: Tue Jun 28 09:49:29 2016 -0500 Committer: Harshad Deshmukh Committed: Thu Jun 30 15:49:05 2016 -0500 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 19 +- query_execution/Foreman.cpp | 2 +- query_execution/Foreman.hpp | 4 +- query_execution/PolicyEnforcer.cpp | 2 - query_execution/PriorityPolicyEnforcer.cpp | 222 ++++++++++++++++++++++++ query_execution/PriorityPolicyEnforcer.hpp | 222 ++++++++++++++++++++++++ 6 files changed, 465 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 4639617..104f9da 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -37,6 +37,7 @@ add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp) add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp) add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp) add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp) +add_library(quickstep_queryexecution_PriorityPolicyEnforcer PriorityPolicyEnforcer.cpp PriorityPolicyEnforcer.hpp) add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto @@ -80,7 +81,7 @@ target_link_libraries(quickstep_queryexecution_Foreman glog quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_ForemanLite - quickstep_queryexecution_PolicyEnforcer + quickstep_queryexecution_PriorityPolicyEnforcer quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_WorkerDirectory @@ -108,6 +109,21 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer glog quickstep_queryexecution_ExecutionStats quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_ProbabilityStore + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryManager + quickstep_queryexecution_WorkerDirectory + quickstep_queryexecution_WorkerMessage + quickstep_queryoptimizer_QueryHandle + quickstep_relationaloperators_WorkOrder + quickstep_utility_Macros + tmb) +target_link_libraries(quickstep_queryexecution_PriorityPolicyEnforcer + ${GFLAGS_LIB_NAME} + glog + quickstep_queryexecution_ExecutionStats + quickstep_catalog_CatalogTypedefs quickstep_queryexecution_Learner quickstep_queryexecution_ProbabilityStore quickstep_queryexecution_QueryExecutionMessages_proto @@ -225,6 +241,7 @@ target_link_libraries(quickstep_queryexecution quickstep_queryexecution_ForemanLite quickstep_queryexecution_Learner quickstep_queryexecution_PolicyEnforcer + quickstep_queryexecution_PriorityPolicyEnforcer quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto quickstep_queryexecution_QueryExecutionMessages_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp index f9f2e7a..0898ac1 100644 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@ -87,7 +87,7 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id, bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); } - policy_enforcer_.reset(new PolicyEnforcer( + policy_enforcer_.reset(new PriorityPolicyEnforcer( foreman_client_id_, num_numa_nodes, catalog_database_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/Foreman.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp index 7be57e7..c38a3e6 100644 --- a/query_execution/Foreman.hpp +++ b/query_execution/Foreman.hpp @@ -24,7 +24,7 @@ #include #include "query_execution/ForemanLite.hpp" -#include "query_execution/PolicyEnforcer.hpp" +#include "query_execution/PriorityPolicyEnforcer.hpp" #include "utility/Macros.hpp" #include "tmb/id_typedefs.h" @@ -128,7 +128,7 @@ class Foreman final : public ForemanLite { CatalogDatabaseLite *catalog_database_; StorageManager *storage_manager_; - std::unique_ptr policy_enforcer_; + std::unique_ptr policy_enforcer_; DISALLOW_COPY_AND_ASSIGN(Foreman); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp index ff734ca..db7206b 100644 --- a/query_execution/PolicyEnforcer.cpp +++ b/query_execution/PolicyEnforcer.cpp @@ -25,7 +25,6 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "query_execution/Learner.hpp" #include "query_execution/ProbabilityStore.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryManager.hpp" @@ -43,7 +42,6 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" " the workers."); bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) { - Learner learner; if (admitted_queries_.size() < kMaxConcurrentQueries) { // Ok to admit the query. const std::size_t query_id = query_handle->query_id(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PriorityPolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PriorityPolicyEnforcer.cpp b/query_execution/PriorityPolicyEnforcer.cpp new file mode 100644 index 0000000..44ccb0a --- /dev/null +++ b/query_execution/PriorityPolicyEnforcer.cpp @@ -0,0 +1,222 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PriorityPolicyEnforcer.hpp" + +#include +#include +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/Learner.hpp" +#include "query_execution/ProbabilityStore.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryManager.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "relational_operators/WorkOrder.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +namespace quickstep { + +DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" + " can be allocated in a single round of dispatch of messages to" + " the workers."); + +bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) { + Learner learner; + if (admitted_queries_.size() < kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle, + catalog_database_, storage_manager_, bus_)); + LOG(INFO) << "Admitted query with ID: " << query_handle->query_id(); + learner_->addQuery(*query_handle); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { + // TODO(harshad) : Provide processXMessage() public functions in + // QueryManager, so that we need to extract message from the + // TaggedMessage only once. + std::size_t query_id; + switch (tagged_message.message_type()) { + case kWorkOrderCompleteMessage: { + serialization::NormalWorkOrderCompletionMessage proto; + // Note: This proto message contains the time it took to execute the + // WorkOrder. It can be accessed in this scope. + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + worker_directory_->decrementNumQueuedWorkOrders( + proto.worker_thread_index()); + learner_->addCompletionFeedback(proto); + if (profile_individual_workorders_) { + recordTimeForWorkOrder(proto); + } + break; + } + case kRebuildWorkOrderCompleteMessage: { + serialization::RebuildWorkOrderCompletionMessage proto; + // Note: This proto message contains the time it took to execute the + // rebuild WorkOrder. It can be accessed in this scope. + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + worker_directory_->decrementNumQueuedWorkOrders( + proto.worker_thread_index()); + break; + } + case kCatalogRelationNewBlockMessage: { + serialization::CatalogRelationNewBlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kDataPipelineMessage: { + serialization::DataPipelineMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrdersAvailableMessage: { + serialization::WorkOrdersAvailableMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrderFeedbackMessage: { + WorkOrder::FeedbackMessage msg( + const_cast(tagged_message.message()), + tagged_message.message_bytes()); + query_id = msg.header().query_id; + break; + } + default: + LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer"; + } + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + const QueryManager::QueryStatusCode return_code = + admitted_queries_[query_id]->processMessage(tagged_message); + if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) { + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PriorityPolicyEnforcer::getWorkerMessages( + std::vector> *worker_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(worker_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + std::size_t per_query_share = 0; + if (!admitted_queries_.empty()) { + per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); + } else { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + DCHECK_GT(per_query_share, 0u); + std::vector finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManager *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + WorkerMessage *next_worker_message = + curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID); + if (next_worker_message != nullptr) { + ++messages_collected_curr_query; + worker_messages->push_back(std::unique_ptr(next_worker_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (const std::size_t finished_qid : finished_queries_ids) { + removeQuery(finished_qid); + } +} + +void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) { + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) { + LOG(WARNING) << "Removing query with ID " << query_id + << " that hasn't finished its execution"; + } + admitted_queries_.erase(query_id); + learner_->removeQuery(query_id); +} + +bool PriorityPolicyEnforcer::admitQueries( + const std::vector &query_handles) { + for (QueryHandle *curr_query : query_handles) { + if (!admitQuery(curr_query)) { + return false; + } + } + return true; +} + +void PriorityPolicyEnforcer::recordTimeForWorkOrder( + const serialization::NormalWorkOrderCompletionMessage &proto) { + const std::size_t query_id = proto.query_id(); + if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) { + workorder_time_recorder_[query_id]; + } + workorder_time_recorder_[query_id].emplace_back( + proto.worker_thread_index(), + proto.operator_index(), + proto.execution_time_in_microseconds()); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/862fd216/query_execution/PriorityPolicyEnforcer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PriorityPolicyEnforcer.hpp b/query_execution/PriorityPolicyEnforcer.hpp new file mode 100644 index 0000000..94cbe38 --- /dev/null +++ b/query_execution/PriorityPolicyEnforcer.hpp @@ -0,0 +1,222 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "query_execution/Learner.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryManager.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +namespace quickstep { + +class CatalogDatabaseLite; +class QueryHandle; +class StorageManager; +class WorkerDirectory; + +/** + * @brief A class that ensures that a high level policy is maintained + * in sharing resources among concurrent queries. + **/ +class PriorityPolicyEnforcer { + public: + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the Foreman. + * @param num_numa_nodes Number of NUMA nodes used by the system. + * @param catalog_database The CatalogDatabase used. + * @param storage_manager The StorageManager used. + * @param bus The TMB. + **/ + PriorityPolicyEnforcer(const tmb::client_id foreman_client_id, + const std::size_t num_numa_nodes, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + WorkerDirectory *worker_directory, + tmb::MessageBus *bus, + const bool profile_individual_workorders = false) + : foreman_client_id_(foreman_client_id), + num_numa_nodes_(num_numa_nodes), + catalog_database_(catalog_database), + storage_manager_(storage_manager), + worker_directory_(worker_directory), + bus_(bus), + profile_individual_workorders_(profile_individual_workorders) { + learner_.reset(new Learner()); + } + + /** + * @brief Destructor. + **/ + ~PriorityPolicyEnforcer() { + if (hasQueries()) { + LOG(WARNING) << "Destructing PriorityPolicyEnforcer with some unfinished or " + "waiting queries"; + } + } + + /** + * @brief Admit a query to the system. + * + * @param query_handle The QueryHandle for the new query. + * + * @return Whether the query was admitted to the system. + **/ + bool admitQuery(QueryHandle *query_handle); + + /** + * @brief Admit multiple queries in the system. + * + * @note In the current simple implementation, we only allow one active + * query in the system. Other queries will have to wait. + * + * @param query_handles A vector of QueryHandles for the queries to be + * admitted. + * + * @return True if all the queries were admitted, false if at least one query + * was not admitted. + **/ + bool admitQueries(const std::vector &query_handles); + + /** + * @brief Remove a given query that is under execution. + * + * @note This function is made public so that it is possible for a query + * to be killed. Otherwise, it should only be used privately by the + * class. + * + * TODO(harshad) - Extend this function to support removal of waiting queries. + * + * @param query_id The ID of the query to be removed. + **/ + void removeQuery(const std::size_t query_id); + + /** + * @brief Get worker messages to be dispatched. These worker messages come + * from the active queries. + * + * @param worker_messages The worker messages to be dispatched. + **/ + void getWorkerMessages( + std::vector> *worker_messages); + + /** + * @brief Process a message sent to the Foreman, which gets passed on to the + * policy enforcer. + * + * @param message The message. + **/ + void processMessage(const TaggedMessage &tagged_message); + + /** + * @brief Check if there are any queries to be executed. + * + * @return True if there is at least one active or waiting query, false if + * the policy enforcer doesn't have any query. + **/ + inline bool hasQueries() const { + return !(admitted_queries_.empty() && waiting_queries_.empty()); + } + + /** + * @brief Get the profiling results for individual work order execution for a + * given query. + * + * @note This function should only be called if profiling individual work + * orders option is enabled. + * + * @param query_id The ID of the query for which the profiling results are + * requested. + * + * @return A vector of tuples, each being a single profiling entry. + **/ + inline const std::vector>& + getProfilingResults(const std::size_t query_id) const { + DCHECK(profile_individual_workorders_); + DCHECK(workorder_time_recorder_.find(query_id) != + workorder_time_recorder_.end()); + return workorder_time_recorder_.at(query_id); + } + + private: + static constexpr std::size_t kMaxConcurrentQueries = 2; + + /** + * @brief Record the execution time for a finished WorkOrder. + * + * TODO(harshad) - Extend the functionality to rebuild work orders. + * + * @param proto The completion message proto sent after the WorkOrder + * execution. + **/ + void recordTimeForWorkOrder( + const serialization::NormalWorkOrderCompletionMessage &proto); + + const tmb::client_id foreman_client_id_; + const std::size_t num_numa_nodes_; + + CatalogDatabaseLite *catalog_database_; + StorageManager *storage_manager_; + WorkerDirectory *worker_directory_; + + tmb::MessageBus *bus_; + const bool profile_individual_workorders_; + + // Key = query ID, value = QueryManager* for the key query. + std::unordered_map> admitted_queries_; + + // The queries which haven't been admitted yet. + std::queue waiting_queries_; + + // Key = Query ID. + // Value = A tuple indicating a record of executing a work order. + // Within a tuple ... + // 1st element: Logical worker ID. + // 2nd element: Operator ID. + // 3rd element: Time in microseconds to execute the work order. + std::unordered_map< + std::size_t, + std::vector>> + workorder_time_recorder_; + + std::unique_ptr learner_; + + DISALLOW_COPY_AND_ASSIGN(PriorityPolicyEnforcer); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_PRIORITY_POLICY_ENFORCER_HPP_