Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDDFD200B5A for ; Thu, 4 Aug 2016 20:46:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC4D9160A6A; Thu, 4 Aug 2016 18:46:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 39764160AAB for ; Thu, 4 Aug 2016 20:46:24 +0200 (CEST) Received: (qmail 93405 invoked by uid 500); 4 Aug 2016 18:46:23 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 93396 invoked by uid 99); 4 Aug 2016 18:46:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Aug 2016 18:46:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AD15DC064A for ; Thu, 4 Aug 2016 18:46:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id QYoYG7zqUnaS for ; Thu, 4 Aug 2016 18:46:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 80C3360D7A for ; Thu, 4 Aug 2016 18:46:16 +0000 (UTC) Received: (qmail 92618 invoked by uid 99); 4 Aug 2016 18:46:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Aug 2016 18:46:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AE96E0A7D; Thu, 4 Aug 2016 18:46:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Added PolicyEnforcer implementation for the distributed version. Date: Thu, 4 Aug 2016 18:46:15 +0000 (UTC) archived-at: Thu, 04 Aug 2016 18:46:25 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/dist-policy-enforcer [created] 0fcc96d77 Added PolicyEnforcer implementation for the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0fcc96d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0fcc96d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0fcc96d7 Branch: refs/heads/dist-policy-enforcer Commit: 0fcc96d777cbc4d29c24f9cd2c05423764f82874 Parents: 991f7a4 Author: Zuyu Zhang Authored: Thu Aug 4 11:45:51 2016 -0700 Committer: Zuyu Zhang Committed: Thu Aug 4 11:45:51 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 24 ++ query_execution/PolicyEnforcerBase.cpp | 2 + query_execution/PolicyEnforcerBase.hpp | 7 + query_execution/PolicyEnforcerDistributed.cpp | 279 +++++++++++++++++++++ query_execution/PolicyEnforcerDistributed.hpp | 113 +++++++++ query_execution/QueryExecutionMessages.proto | 20 +- query_execution/QueryExecutionTypedefs.hpp | 5 + query_execution/QueryManagerBase.cpp | 3 +- query_execution/QueryManagerBase.hpp | 11 +- 9 files changed, 458 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 8bf1ab1..75be19e 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -35,6 +35,9 @@ endif() add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto @@ -110,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase quickstep_storage_StorageBlockInfo quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed + glog + quickstep_catalog_CatalogRelation + quickstep_catalog_Catalog_proto + quickstep_queryexecution_PolicyEnforcerBase + quickstep_queryexecution_QueryContext_proto + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_QueryManagerBase + quickstep_queryexecution_QueryManagerDistributed + quickstep_queryexecution_ShiftbossDirectory + quickstep_queryoptimizer_QueryHandle + quickstep_storage_StorageBlockInfo + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode glog quickstep_catalog_CatalogTypedefs @@ -294,6 +317,7 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss quickstep_queryexecution_ShiftbossDirectory) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index 3371d6d..d50a173 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -134,6 +134,8 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { } if (admitted_queries_[query_id]->queryStatus(op_index) == QueryManagerBase::QueryStatusCode::kQueryExecuted) { + onQueryCompletion(admitted_queries_[query_id].get()); + removeQuery(query_id); if (!waiting_queries_.empty()) { // Admit the earliest waiting query. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index 15bc118..4293f0f 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -138,6 +138,13 @@ class PolicyEnforcerBase { static constexpr std::size_t kMaxConcurrentQueries = 1; /** + * @brief Add custom actions upon the completion of a query. + * + * @param query_manager The query manager. + **/ + virtual void onQueryCompletion(QueryManagerBase *query_manager) {} + + /** * @brief Record the execution time for a finished WorkOrder. * * TODO(harshad) - Extend the functionality to rebuild work orders. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp new file mode 100644 index 0000000..8ed4bc9 --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -0,0 +1,279 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PolicyEnforcerDistributed.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.pb.h" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/QueryManagerBase.hpp" +#include "query_execution/QueryManagerDistributed.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "storage/StorageBlockInfo.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::TaggedMessage; + +namespace quickstep { + +namespace S = serialization; + +DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" + " can be allocated in a single round of dispatch of messages to" + " the workers."); + +void PolicyEnforcerDistributed::getWorkOrderMessages( + vector> *work_order_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(work_order_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + if (admitted_queries_.empty()) { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + + const std::size_t per_query_share = + FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); + DCHECK_GT(per_query_share, 0u); + + vector finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManagerBase *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + S::WorkOrderMessage *next_work_order_message = + static_cast(curr_query_manager)->getNextWorkOrderMessage(0); + if (next_work_order_message != nullptr) { + ++messages_collected_curr_query; + work_order_messages->push_back(unique_ptr(next_work_order_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (const std::size_t finished_qid : finished_queries_ids) { + onQueryCompletion(admitted_queries_[finished_qid].get()); + removeQuery(finished_qid); + } +} + +bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { + if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + // NOTE(zuyu): Should call before constructing a 'QueryManager'. + // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' + // initializes. + initiateQueryInShiftboss(query_handle); + + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { + S::InitiateRebuildResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + QueryManagerDistributed *query_manager = static_cast(admitted_queries_[query_id].get()); + + const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); + query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders); + shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); + + if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + onQueryCompletion(query_manager); + + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) { + S::QueryInitiateMessage proto; + proto.set_query_id(query_handle->query_id()); + proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto()); + proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kQueryInitiateMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage + << "') to Shiftboss 0"; + + // TODO(zuyu): Multiple Shiftbosses support. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); + + // Wait Shiftboss for QueryInitiateResponseMessage. + const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type()); + LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::QueryInitiateResponseMessage proto_response; + CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); +} + +void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) { + const QueryHandle *query_handle = query_manager->query_handle(); + + const CatalogRelation *query_result = query_handle->getQueryResultRelation(); + const tmb::client_id cli_id = query_handle->getClientId(); + const std::size_t query_id = query_handle->query_id(); + + if (query_result == nullptr) { + // Clean up query execution states, i.e., QueryContext, in Shiftboss. + serialization::QueryTeardownMessage proto; + proto.set_query_id(query_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kQueryTeardownMessage); + + // TODO(zuyu): Support multiple shiftbosses. + LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage + << "') to Shiftboss 0"; + tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss"; + + TaggedMessage cli_message(kQueryExecutionSuccessMessage); + + // Notify the CLI query execution successfully. + LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(cli_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to CLI with TMB client ID " << cli_id; + return; + } + + // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss. + S::SaveQueryResultMessage proto; + proto.set_query_id(query_id); + proto.set_relation_id(query_result->getID()); + + const vector blocks(query_result->getBlocksSnapshot()); + for (const block_id block : blocks) { + proto.add_blocks(block); + } + + proto.set_cli_id(cli_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kSaveQueryResultMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage + << "') to Shiftboss 0"; + // TODO(zuyu): Support multiple shiftbosses. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp new file mode 100644 index 0000000..807036c --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -0,0 +1,113 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ + +#include +#include +#include + +#include "query_execution/PolicyEnforcerBase.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { +class MessageBus; +class TaggedMessage; +} + +namespace quickstep { + +class CatalogDatabaseLite; +class QueryHandle; +class QueryManagerBase; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A class that ensures that a high level policy is maintained + * in sharing resources among concurrent queries. + **/ +class PolicyEnforcerDistributed final : public PolicyEnforcerBase { + public: + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the Foreman. + * @param catalog_database The CatalogDatabase used. + * @param bus The TMB. + **/ + PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, + CatalogDatabaseLite *catalog_database, + ShiftbossDirectory *shiftboss_directory, + tmb::MessageBus *bus, + const bool profile_individual_workorders = false) + : PolicyEnforcerBase(catalog_database, profile_individual_workorders), + foreman_client_id_(foreman_client_id), + shiftboss_directory_(shiftboss_directory), + bus_(bus) {} + + /** + * @brief Destructor. + **/ + ~PolicyEnforcerDistributed() override {} + + bool admitQuery(QueryHandle *query_handle) override; + + /** + * @brief Get work order messages to be dispatched. These messages come from + * the active queries. + * + * @param work_order_messages The work order messages to be dispatched. + **/ + void getWorkOrderMessages( + std::vector> *work_order_messages); + + /** + * @brief Process the initiate rebuild work order response message. + * + * @param tagged_message The message. + **/ + void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); + + private: + void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override { + shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index); + } + + void onQueryCompletion(QueryManagerBase *query_manager) override; + + void initiateQueryInShiftboss(QueryHandle *query_handle); + + const tmb::client_id foreman_client_id_; + + ShiftbossDirectory *shiftboss_directory_; + + tmb::MessageBus *bus_; + + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 4922042..bc8ebcf 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -38,8 +38,8 @@ message NormalWorkOrderCompletionMessage { required uint64 operator_index = 1; required uint64 worker_thread_index = 2; required uint64 query_id = 3; - - // Epoch time in microseconds. + + // Epoch time in microseconds. optional uint64 execution_start_time = 4; optional uint64 execution_end_time = 5; } @@ -116,13 +116,25 @@ message InitiateRebuildResponseMessage { required uint64 shiftboss_index = 4; } +message QueryTeardownMessage { + required uint64 query_id = 1; +} + message SaveQueryResultMessage { - required int32 relation_id = 1; - repeated fixed64 blocks = 2 [packed=true]; + required uint64 query_id = 1; + required int32 relation_id = 2; + repeated fixed64 blocks = 3 [packed=true]; + + required uint32 cli_id = 4; // tmb::client_id. } message SaveQueryResultResponseMessage { required int32 relation_id = 1; + required uint32 cli_id = 2; // tmb::client_id. +} + +message QueryExecutionSuccessMessage { + optional CatalogRelationSchema result_relation = 1; } // BlockLocator related messages. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 4bbab59..b535d3d 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -84,9 +84,14 @@ enum QueryExecutionMessageType : message_type_id { kInitiateRebuildMessage, // From Foreman to Shiftboss. kInitiateRebuildResponseMessage, // From Shiftboss to Foreman. + kQueryTeardownMessage, // From Foreman to Shiftboss. + kSaveQueryResultMessage, // From Foreman to Shiftboss. kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. + // From Foreman to CLI. + kQueryExecutionSuccessMessage, + // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. kBlockDomainRegistrationMessage, // From Worker to BlockLocator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index d2a3341..5b94ee8 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -35,7 +35,8 @@ using std::pair; namespace quickstep { QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) - : query_id_(DCHECK_NOTNULL(query_handle)->query_id()), + : query_handle_(DCHECK_NOTNULL(query_handle)), + query_id_(query_handle->query_id()), query_dag_(DCHECK_NOTNULL( DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())), num_operators_in_dag_(query_dag_->size()), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0fcc96d7/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index 6edfd5c..54cd3d1 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -72,6 +72,13 @@ class QueryManagerBase { virtual ~QueryManagerBase() {} /** + * @brief Get the query handle. + **/ + const QueryHandle* query_handle() const { + return query_handle_; + } + + /** * @brief Get the QueryExecutionState for this query. **/ inline const QueryExecutionState& getQueryExecutionState() const { @@ -250,9 +257,11 @@ class QueryManagerBase { return query_exec_state_->hasRebuildInitiated(index); } + const QueryHandle *query_handle_; + const std::size_t query_id_; - DAG *query_dag_; + DAG *query_dag_; // Owned by 'query_handle_'. const dag_node_index num_operators_in_dag_; // For all nodes, store their receiving dependents.