Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 70C38200B5A for ; Thu, 4 Aug 2016 21:04:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6F049160AAB; Thu, 4 Aug 2016 19:04:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 196BA160A6A for ; Thu, 4 Aug 2016 21:04:10 +0200 (CEST) Received: (qmail 41391 invoked by uid 500); 4 Aug 2016 19:04:10 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 41382 invoked by uid 99); 4 Aug 2016 19:04:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Aug 2016 19:04:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id B36BF188682 for ; Thu, 4 Aug 2016 19:04:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id j1fKDxKdJbbZ for ; Thu, 4 Aug 2016 19:03:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 452345FAEE for ; Thu, 4 Aug 2016 19:03:55 +0000 (UTC) Received: (qmail 41142 invoked by uid 99); 4 Aug 2016 19:03:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Aug 2016 19:03:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 67C16E0A7D; Thu, 4 Aug 2016 19:03:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Thu, 04 Aug 2016 19:03:54 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] incubator-quickstep git commit: Added the distributed execution engine and tests. [Forced Update!] archived-at: Thu, 04 Aug 2016 19:04:13 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/new-distributed-exe-test 01c48c15b -> 738ffe9a7 (forced update) Added the distributed execution engine and tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/220fa06f Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/220fa06f Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/220fa06f Branch: refs/heads/new-distributed-exe-test Commit: 220fa06fff04ffffe14bf24ee090c5bea0b5f55d Parents: aaecc76 Author: Zuyu Zhang Authored: Fri Jul 22 11:31:33 2016 -0700 Committer: Zuyu Zhang Committed: Fri Jul 29 15:59:10 2016 -0700 ---------------------------------------------------------------------- query_execution/AdmitRequestMessage.hpp | 7 + query_execution/CMakeLists.txt | 50 ++- query_execution/ForemanDistributed.cpp | 333 +++++++++++++++++++ query_execution/ForemanDistributed.hpp | 130 ++++++++ query_execution/PolicyEnforcerBase.cpp | 3 + query_execution/PolicyEnforcerBase.hpp | 7 + query_execution/PolicyEnforcerDistributed.cpp | 253 ++++++++++++++ query_execution/PolicyEnforcerDistributed.hpp | 112 +++++++ query_execution/QueryExecutionMessages.proto | 16 +- query_execution/QueryExecutionTypedefs.hpp | 4 + query_execution/QueryManagerBase.cpp | 3 +- query_execution/QueryManagerBase.hpp | 26 +- query_execution/QueryManagerDistributed.cpp | 41 ++- query_execution/QueryManagerDistributed.hpp | 10 +- query_execution/Shiftboss.cpp | 89 +++-- query_optimizer/CMakeLists.txt | 4 + query_optimizer/QueryHandle.hpp | 26 ++ query_optimizer/tests/CMakeLists.txt | 41 +++ .../tests/DistributedExecutionGeneratorTest.cpp | 57 ++++ .../DistributedExecutionGeneratorTestRunner.cpp | 122 +++++++ .../DistributedExecutionGeneratorTestRunner.hpp | 146 ++++++++ .../tests/execution_generator/CMakeLists.txt | 68 +++- third_party/tmb/include/tmb/tagged_message.h | 9 + 23 files changed, 1511 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/AdmitRequestMessage.hpp ---------------------------------------------------------------------- diff --git a/query_execution/AdmitRequestMessage.hpp b/query_execution/AdmitRequestMessage.hpp index e33b354..75c5ff6 100644 --- a/query_execution/AdmitRequestMessage.hpp +++ b/query_execution/AdmitRequestMessage.hpp @@ -60,6 +60,13 @@ class AdmitRequestMessage { return query_handles_; } + /** + * @brief Get the mutable query handles from this message. + **/ + std::vector* getQueryHandlesMutable() { + return &query_handles_; + } + private: std::vector query_handles_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 8bf1ab1..cfb72d7 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -33,8 +33,14 @@ if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) endif() add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto @@ -83,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase quickstep_threading_Thread quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_ForemanDistributed + glog + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_AdmitRequestMessage + quickstep_queryexecution_ForemanBase + quickstep_queryexecution_PolicyEnforcerDistributed + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_ShiftbossDirectory + quickstep_queryoptimizer_QueryHandle + quickstep_threading_ThreadUtil + quickstep_utility_EqualsAnyConstant + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ForemanSingleNode glog quickstep_queryexecution_AdmitRequestMessage @@ -110,6 +136,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase quickstep_storage_StorageBlockInfo quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed + glog + quickstep_catalog_CatalogRelation + quickstep_catalog_Catalog_proto + quickstep_queryexecution_PolicyEnforcerBase + quickstep_queryexecution_QueryContext_proto + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_QueryManagerBase + quickstep_queryexecution_QueryManagerDistributed + quickstep_queryexecution_ShiftbossDirectory + quickstep_queryoptimizer_QueryHandle + quickstep_storage_StorageBlockInfo + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode glog quickstep_catalog_CatalogTypedefs @@ -294,10 +340,12 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_ForemanDistributed + quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss quickstep_queryexecution_ShiftbossDirectory) -endif() +endif(ENABLE_DISTRIBUTED) # Tests: if (ENABLE_DISTRIBUTED) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp new file mode 100644 index 0000000..1c0fba8 --- /dev/null +++ b/query_execution/ForemanDistributed.cpp @@ -0,0 +1,333 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/ForemanDistributed.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/AdmitRequestMessage.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "threading/ThreadUtil.hpp" +#include "utility/EqualsAnyConstant.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/message_style.h" +#include "tmb/tagged_message.h" + +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::AnnotatedMessage; +using tmb::TaggedMessage; + +namespace quickstep { + +namespace S = serialization; + +ForemanDistributed::ForemanDistributed( + tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id, + const bool profile_individual_workorders) + : ForemanBase(bus, cpu_id), + catalog_database_(DCHECK_NOTNULL(catalog_database)) { + const std::vector sender_message_types{ + kShiftbossRegistrationResponseMessage, + kQueryInitiateMessage, + kWorkOrderMessage, + kInitiateRebuildMessage, + kSaveQueryResultMessage, + kQueryExecutionSuccessMessage, + kPoisonMessage}; + + for (const auto message_type : sender_message_types) { + bus_->RegisterClientAsSender(foreman_client_id_, message_type); + } + + const std::vector receiver_message_types{ + kShiftbossRegistrationMessage, + kAdmitRequestMessage, + kQueryInitiateResponseMessage, + kCatalogRelationNewBlockMessage, + kDataPipelineMessage, + kInitiateRebuildResponseMessage, + kWorkOrderCompleteMessage, + kRebuildWorkOrderCompleteMessage, + kWorkOrderFeedbackMessage, + kWorkOrdersAvailableMessage, + kSaveQueryResultResponseMessage, + kPoisonMessage}; + + for (const auto message_type : receiver_message_types) { + bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); + } + + policy_enforcer_.reset(new PolicyEnforcerDistributed( + foreman_client_id_, + catalog_database_, + &shiftboss_directory_, + bus_, + profile_individual_workorders)); +} + +void ForemanDistributed::run() { + if (cpu_id_ >= 0) { + // We can pin the foreman thread to a CPU if specified. + ThreadUtil::BindToCPU(cpu_id_); + } + + // Ensure that at least one Shiftboss to register. + if (shiftboss_directory_.empty()) { + const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); + LOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity()); + DCHECK_EQ(1u, shiftboss_directory_.size()); + } + + // Event loop + for (;;) { + // Receive() causes this thread to sleep until next message is received. + const AnnotatedMessage annotated_message = + bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); + LOG(INFO) << "ForemanDistributed received typed '" << message_type + << "' message from client " << annotated_message.sender; + switch (message_type) { + case kShiftbossRegistrationMessage: { + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegisterationMessage(annotated_message.sender, proto.work_order_capacity()); + break; + } + case kAdmitRequestMessage: { + AdmitRequestMessage *request_message = + const_cast( + static_cast(tagged_message.message())); + + vector *query_handles = request_message->getQueryHandlesMutable(); + DCHECK(!query_handles->empty()); + + for (QueryHandle *handle : *query_handles) { + handle->setClientId(annotated_message.sender); + } + + bool all_queries_admitted = true; + if (query_handles->size() == 1u) { + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles->front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(*query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. + } + break; + } + case kQueryInitiateResponseMessage: { + // TODO(zuyu): check the query id. + break; + } + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrdersAvailableMessage: { + policy_enforcer_->processMessage(tagged_message); + break; + } + case kInitiateRebuildResponseMessage: { + // A unique case in the distributed version. + policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message); + break; + } + case kSaveQueryResultResponseMessage: { + S::SaveQueryResultResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); + break; + } + case kPoisonMessage: { + if (policy_enforcer_->hasQueries()) { + LOG(WARNING) << "Foreman thread exiting while some queries are " + "under execution or waiting to be admitted"; + } + + // Shutdown all Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i)); + } + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + TaggedMessage poison_message(kPoisonMessage); + + const tmb::MessageBus::SendStatus send_status = + bus_->Send(foreman_client_id_, + shiftboss_addresses, + broadcast_style, + move(poison_message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); + return; + } + default: + LOG(FATAL) << "Unknown message type to Foreman"; + } + + if (canCollectNewMessages(message_type)) { + vector> new_messages; + policy_enforcer_->getWorkOrderMessages(&new_messages); + dispatchWorkOrderMessages(new_messages); + } + } +} + +bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { + return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, + kCatalogRelationNewBlockMessage, + kWorkOrderFeedbackMessage) && + // TODO(zuyu): Multiple Shiftbosses support. + !shiftboss_directory_.hasReachedCapacity(0); +} + +void ForemanDistributed::dispatchWorkOrderMessages(const vector> &messages) { + for (const auto &message : messages) { + DCHECK(message != nullptr); + // TODO(zuyu): Multiple Shiftbosses support. + sendWorkOrderMessage(0, *message); + shiftboss_directory_.incrementNumQueuedWorkOrders(0); + } +} + +void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, + const S::WorkOrderMessage &proto) { + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kWorkOrderMessage); + free(proto_bytes); + + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_.getClientId(shiftboss_index), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_.getClientId(shiftboss_index); +} + +void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const { + const std::vector< + std::tuple> + &recorded_times = policy_enforcer_->getProfilingResults(query_id); + fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out); + for (const auto &workorder_entry : recorded_times) { + // Note: Index of the "worker thread index" in the tuple is 0. + const std::size_t worker_id = std::get<0>(workorder_entry); + fprintf(out, + "%lu,%lu,%lu,%lu\n", + query_id, + worker_id, + std::get<1>(workorder_entry), // Operator ID. + std::get<2>(workorder_entry)); // Time. + } +} + +void ForemanDistributed::processShiftbossRegisterationMessage(const client_id shiftboss_client_id, + const std::size_t work_order_capacity) { + shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); + + S::ShiftbossRegistrationResponseMessage proto; + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kShiftbossRegistrationResponseMessage); + free(proto_bytes); + + LOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '" + << kShiftbossRegistrationResponseMessage + << "') to Shiftboss with TMB client id " << shiftboss_client_id; + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_client_id, + move(message)); +} + +void ForemanDistributed::processSaveQueryResultResponseMessage(const tmb::client_id cli_id, + const relation_id result_relation_id) { + S::QueryExecutionSuccessMessage proto; + proto.mutable_result_relation()->MergeFrom( + static_cast(catalog_database_)->getRelationById(result_relation_id)->getProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kQueryExecutionSuccessMessage); + free(proto_bytes); + + // Notify the CLI regarding the query result. + LOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(message)); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp new file mode 100644 index 0000000..8a4a97c --- /dev/null +++ b/query_execution/ForemanDistributed.hpp @@ -0,0 +1,130 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ + +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/ForemanBase.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class CatalogDatabaseLite; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief The Foreman receives queries from the main thread, messages from the + * policy enforcer and dispatches the work to Shiftbosses. It also + * receives work completion messages from Shiftbosses. + **/ +class ForemanDistributed final : public ForemanBase { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param catalog_database The catalog database where this query is executed. + * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. + * @param profile_individual_workorders Whether every workorder's execution + * be profiled or not. + * + * @note If cpu_id is not specified, Foreman thread can be possibly moved + * around on different CPUs by the OS. + **/ + ForemanDistributed(tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id = -1, + const bool profile_individual_workorders = false); + + ~ForemanDistributed() override {} + + /** + * @brief Print the results of profiling individual work orders for a given + * query. + * + * TODO(harshad) - Add the name of the operator to the output. + * + * @param query_id The ID of the query for which the results are to be printed. + * @param out The file stream. + **/ + void printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const; + + protected: + void run() override; + + private: + /** + * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the + * worker threads. + * + * @param messages The messages to be dispatched. + **/ + void dispatchWorkOrderMessages( + const std::vector> &messages); + + /** + * @brief Send the given message to the specified worker. + * + * @param worker_index The logical index of the recipient worker in + * ShiftbossDirectory. + * @param proto The WorkOrderMessage to be sent. + **/ + void sendWorkOrderMessage(const std::size_t worker_index, + const serialization::WorkOrderMessage &proto); + + void processShiftbossRegisterationMessage(const tmb::client_id shiftboss_client_id, + const std::size_t work_order_capacity); + + void processSaveQueryResultResponseMessage(const tmb::client_id cli_id, + const relation_id result_relation_id); + + /** + * @brief Check if we can collect new messages from the PolicyEnforcer. + * + * @param message_type The type of the last received message. + **/ + bool canCollectNewMessages(const tmb::message_type_id message_type); + + ShiftbossDirectory shiftboss_directory_; + + CatalogDatabaseLite *catalog_database_; + + std::unique_ptr policy_enforcer_; + + DISALLOW_COPY_AND_ASSIGN(ForemanDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index d16a502..a28fa3b 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -131,8 +131,11 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { default: LOG(FATAL) << "Unknown message type found in PolicyEnforcer"; } + if (admitted_queries_[query_id]->queryStatus(op_index) == QueryManagerBase::QueryStatusCode::kQueryExecuted) { + onQueryCompletion(admitted_queries_[query_id]->query_handle()); + removeQuery(query_id); if (!waiting_queries_.empty()) { // Admit the earliest waiting query. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index 0482ebc..1de0677 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -148,6 +148,13 @@ class PolicyEnforcerBase { void recordTimeForWorkOrder( const serialization::NormalWorkOrderCompletionMessage &proto); + /** + * @brief Add custom actions upon the completion of a query. + * + * @param query_handle The query handle. + **/ + virtual void onQueryCompletion(QueryHandle *query_handle) {} + CatalogDatabaseLite *catalog_database_; const bool profile_individual_workorders_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp new file mode 100644 index 0000000..59df3de --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -0,0 +1,253 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PolicyEnforcerDistributed.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.pb.h" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/QueryManagerBase.hpp" +#include "query_execution/QueryManagerDistributed.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "storage/StorageBlockInfo.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::TaggedMessage; + +namespace quickstep { + +namespace S = serialization; + +DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" + " can be allocated in a single round of dispatch of messages to" + " the workers."); + +void PolicyEnforcerDistributed::getWorkOrderMessages( + vector> *work_order_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(work_order_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + if (admitted_queries_.empty()) { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + + const std::size_t per_query_share = + FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); + DCHECK_GT(per_query_share, 0u); + + vector finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManagerBase *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + S::WorkOrderMessage *next_work_order_message = + static_cast(curr_query_manager)->getNextWorkOrderMessage(0); + if (next_work_order_message != nullptr) { + ++messages_collected_curr_query; + work_order_messages->push_back(unique_ptr(next_work_order_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (const std::size_t finished_qid : finished_queries_ids) { + onQueryCompletion(admitted_queries_[finished_qid]->query_handle()); + removeQuery(finished_qid); + } +} + +bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { + if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + // NOTE(zuyu): Should call before constructing a 'QueryManager'. + // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' + // initializes. + initiateQueryInShiftboss(query_handle); + + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { + S::InitiateRebuildResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + QueryManagerBase *query_manager = admitted_queries_[query_id].get(); + + const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); + query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders); + shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); + + if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + onQueryCompletion(query_manager->query_handle()); + + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) { + S::QueryInitiateMessage proto; + proto.set_query_id(query_handle->query_id()); + proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto()); + proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kQueryInitiateMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage + << "') to Shiftboss 0"; + + // TODO(zuyu): Multiple Shiftbosses support. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); + + // Wait Shiftboss for QueryInitiateResponseMessage. + const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type()); + LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::QueryInitiateResponseMessage proto_response; + CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); +} + +void PolicyEnforcerDistributed::onQueryCompletion(QueryHandle *query_handle) { + const CatalogRelation *query_result = query_handle->getQueryResultRelation(); + if (query_result == nullptr) { + // TODO(zuyu): notify Shiftboss to remove QueryContext. + TaggedMessage message(kQueryExecutionSuccessMessage); + + const tmb::client_id cli_id = query_handle->getClientId(); + + // Notify the CLI regarding the query execution result. + LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to CLI with TMB client ID " << cli_id; + return; + } + + // SaveQueryResultMessage implies QueryContext clean up in Shiftboss. + S::SaveQueryResultMessage proto; + proto.set_query_id(query_handle->query_id()); + proto.set_relation_id(query_result->getID()); + + const vector blocks(query_result->getBlocksSnapshot()); + for (const block_id block : blocks) { + proto.add_blocks(block); + } + + proto.set_cli_id(query_handle->getClientId()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kSaveQueryResultMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage + << "') to Shiftboss 0"; + // TODO(zuyu): Support multiple shiftbosses. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp new file mode 100644 index 0000000..8b07748 --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ + +#include +#include +#include + +#include "query_execution/PolicyEnforcerBase.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { +class MessageBus; +class TaggedMessage; +} + +namespace quickstep { + +class CatalogDatabaseLite; +class QueryHandle; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A class that ensures that a high level policy is maintained + * in sharing resources among concurrent queries. + **/ +class PolicyEnforcerDistributed final : public PolicyEnforcerBase { + public: + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the Foreman. + * @param catalog_database The CatalogDatabase used. + * @param bus The TMB. + **/ + PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, + CatalogDatabaseLite *catalog_database, + ShiftbossDirectory *shiftboss_directory, + tmb::MessageBus *bus, + const bool profile_individual_workorders = false) + : PolicyEnforcerBase(catalog_database, profile_individual_workorders), + foreman_client_id_(foreman_client_id), + shiftboss_directory_(shiftboss_directory), + bus_(bus) {} + + /** + * @brief Destructor. + **/ + ~PolicyEnforcerDistributed() override {} + + bool admitQuery(QueryHandle *query_handle) override; + + /** + * @brief Get work order messages to be dispatched. These messages come from + * the active queries. + * + * @param work_order_messages The work order messages to be dispatched. + **/ + void getWorkOrderMessages( + std::vector> *work_order_messages); + + /** + * @brief Process the initiate rebuild work order response message. + * + * @param tagged_message The message. + **/ + void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); + + private: + void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override { + shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index); + } + + void onQueryCompletion(QueryHandle *query_handle) override; + + void initiateQueryInShiftboss(QueryHandle *query_handle); + + const tmb::client_id foreman_client_id_; + + ShiftbossDirectory *shiftboss_directory_; + + tmb::MessageBus *bus_; + + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 308d736..99de75c 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -111,15 +111,27 @@ message InitiateRebuildResponseMessage { required uint64 query_id = 1; required uint64 operator_index = 2; required uint64 num_rebuild_work_orders = 3; + required uint64 shiftboss_index = 4; } message SaveQueryResultMessage { - required int32 relation_id = 1; - repeated fixed64 blocks = 2 [packed=true]; + required uint64 query_id = 1; + required int32 relation_id = 2; + repeated fixed64 blocks = 3 [packed=true]; + + // Defined in "tmb/id_typedefs.h". + required uint32 cli_id = 4; } message SaveQueryResultResponseMessage { required int32 relation_id = 1; + + // Defined in "tmb/id_typedefs.h". + required uint32 cli_id = 2; +} + +message QueryExecutionSuccessMessage { + optional CatalogRelationSchema result_relation = 1; } // BlockLocator related messages. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index b67209f..0d43237 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -84,6 +84,10 @@ enum QueryExecutionMessageType : message_type_id { kSaveQueryResultMessage, // From Foreman to Shiftboss. kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. + // From Foreman to CLI. + kQueryExecutionSuccessMessage, + kQueryExecutionErrorMessage, + // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. kBlockDomainRegistrationMessage, // From Worker to BlockLocator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index d2a3341..4ee51c3 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -35,7 +35,8 @@ using std::pair; namespace quickstep { QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) - : query_id_(DCHECK_NOTNULL(query_handle)->query_id()), + : query_handle_(query_handle), + query_id_(DCHECK_NOTNULL(query_handle)->query_id()), query_dag_(DCHECK_NOTNULL( DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())), num_operators_in_dag_(query_dag_->size()), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index 6edfd5c..3338478 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -24,6 +24,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryExecutionState.hpp" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" @@ -79,6 +80,13 @@ class QueryManagerBase { } /** + * @brief Get the query handle. + **/ + inline QueryHandle* query_handle() const { + return query_handle_; + } + + /** * @brief Process the received WorkOrder complete message. * * @param op_index The index of the specified operator node in the query DAG @@ -128,6 +136,20 @@ class QueryManagerBase { void processFeedbackMessage(const dag_node_index op_index, const WorkOrder::FeedbackMessage &message); +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Process the initiate rebuild work order response message. + * + * @param shiftboss_index The Shiftboss index for the rebuild work orders. + * @param op_index The index of the specified operator node in the query DAG + * for initiating the rebuild work order. + * @param num_rebuild_work_orders The number of the rebuild work orders + * generated for the operator indexed by 'op_index'. + **/ + virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) {} +#endif // QUICKSTEP_DISTRIBUTED + /** * @brief Get the query status after processing an incoming message. * @@ -250,9 +272,11 @@ class QueryManagerBase { return query_exec_state_->hasRebuildInitiated(index); } + QueryHandle *query_handle_; // Owned by the optimizer. + const std::size_t query_id_; - DAG *query_dag_; + DAG *query_dag_; // Owned by 'query_handle_'. const dag_node_index num_operators_in_dag_; // For all nodes, store their receiving dependents. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index e906fa5..bed3e45 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -32,6 +32,7 @@ #include "glog/logging.h" #include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" using std::free; using std::malloc; @@ -42,11 +43,11 @@ using std::unique_ptr; namespace quickstep { QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle, - ShiftbossDirectory *shiftbosses, + ShiftbossDirectory *shiftboss_directory, const tmb::client_id foreman_client_id, tmb::MessageBus *bus) : QueryManagerBase(query_handle), - shiftbosses_(shiftbosses), + shiftboss_directory_(shiftboss_directory), foreman_client_id_(foreman_client_id), bus_(bus), normal_workorder_protos_container_( @@ -119,6 +120,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) return generated_new_workorder_protos; } +void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) { + // TODO(zuyu): Multiple workers support. + query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true); + + if (num_rebuild_work_orders != 0u) { + // Wait for the rebuild work orders finish. + return; + } + + markOperatorFinished(op_index); + + for (const std::pair &dependent_link : + query_dag_->getDependents(op_index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + processOperator(dependent_op_index, true); + } + } +} + bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { DCHECK(checkRebuildRequired(index)); DCHECK(!checkRebuildInitiated(index)); @@ -127,6 +149,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId); serialization::InitiateRebuildMessage proto; + proto.set_query_id(query_id_); proto.set_operator_index(index); proto.set_insert_destination_index(op.getInsertDestinationID()); proto.set_relation_id(op.getOutputRelationID()); @@ -140,13 +163,17 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { kInitiateRebuildMessage); free(proto_bytes); - LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage + LOG(INFO) << "QueryManagerDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage << "') to Shiftboss"; // TODO(zuyu): Multiple workers support. - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftbosses_->getClientId(0), - move(tagged_msg)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(tagged_msg)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); // The negative value indicates that the number of rebuild work orders is to be // determined. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index 8641c22..9a3f44b 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -15,6 +15,7 @@ #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ +#include #include #include "query_execution/QueryExecutionState.hpp" @@ -47,12 +48,12 @@ class QueryManagerDistributed final : public QueryManagerBase { * @brief Constructor. * * @param query_handle The QueryHandle object for this query. - * @param shiftbosses The ShiftbossDirectory to use. + * @param shiftboss_directory The ShiftbossDirectory to use. * @param foreman_client_id The TMB client ID of the foreman thread. * @param bus The TMB used for communication. **/ QueryManagerDistributed(QueryHandle *query_handle, - ShiftbossDirectory *shiftbosses, + ShiftbossDirectory *shiftboss_directory, const tmb::client_id foreman_client_id, tmb::MessageBus *bus); @@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase { bool fetchNormalWorkOrders(const dag_node_index index) override; + void processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) override; + /** * @brief Get the next normal workorder to be excuted, wrapped in a * WorkOrderMessage proto. @@ -88,7 +92,7 @@ class QueryManagerDistributed final : public QueryManagerBase { (query_exec_state_->getNumRebuildWorkOrders(index) == 0); } - ShiftbossDirectory *shiftbosses_; + ShiftbossDirectory *shiftboss_directory_; const tmb::client_id foreman_client_id_; tmb::MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 7f655c6..925dc1f 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -113,10 +113,14 @@ void Shiftboss::run() { << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage << "') from Foreman to worker " << worker_index; - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Worker with TMB client ID " << workers_->getClientID(worker_index); break; } case kInitiateRebuildMessage: { @@ -143,10 +147,14 @@ void Shiftboss::run() { << "' message from worker (client " << annotated_message.sender << ") to Foreman"; DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(annotated_message.tagged_message)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(annotated_message.tagged_message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Foreman with TMB client ID " << foreman_client_id_; break; } case kSaveQueryResultMessage: { @@ -167,8 +175,11 @@ void Shiftboss::run() { } } + query_contexts_.erase(proto.query_id()); + serialization::SaveQueryResultResponseMessage proto_response; proto_response.set_relation_id(proto.relation_id()); + proto_response.set_cli_id(proto.cli_id()); const size_t proto_response_length = proto_response.ByteSize(); char *proto_response_bytes = static_cast(malloc(proto_response_length)); @@ -182,10 +193,14 @@ void Shiftboss::run() { LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage << "') to Foreman"; - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(message_response)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Foreman with TMB client ID " << foreman_client_id_; break; } case kPoisonMessage: { @@ -196,7 +211,7 @@ void Shiftboss::run() { tmb::MessageStyle broadcast_style; broadcast_style.Broadcast(true); - tmb::MessageBus::SendStatus send_status = + const tmb::MessageBus::SendStatus send_status = bus_->Send(shiftboss_client_id_, worker_addresses_, broadcast_style, @@ -249,7 +264,7 @@ void Shiftboss::registerWithForeman() { kShiftbossRegistrationMessage); free(proto_bytes); - tmb::MessageBus::SendStatus send_status = + const tmb::MessageBus::SendStatus send_status = bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); } @@ -268,10 +283,6 @@ void Shiftboss::processQueryInitiateMessage( bus_)); query_contexts_.emplace(query_id, move(query_context)); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage - << "') to Foreman"; - serialization::QueryInitiateResponseMessage proto; proto.set_query_id(query_id); @@ -284,10 +295,18 @@ void Shiftboss::processQueryInitiateMessage( kQueryInitiateResponseMessage); free(proto_bytes); - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage + << "') to Foreman"; + + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(message_response)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Foreman with TMB client ID " << foreman_client_id_; } void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, @@ -311,6 +330,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, proto.set_query_id(query_id); proto.set_operator_index(op_index); proto.set_num_rebuild_work_orders(partially_filled_block_refs.size()); + // TODO(zuyu): Multiple Shiftboss support. + proto.set_shiftboss_index(0); const size_t proto_length = proto.ByteSize(); char *proto_bytes = static_cast(malloc(proto_length)); @@ -321,10 +342,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kInitiateRebuildResponseMessage); free(proto_bytes); - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(message_response)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Foreman with TMB client ID " << foreman_client_id_; for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) { // NOTE(zuyu): Worker releases the memory after the execution of @@ -349,10 +374,14 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage << "') to worker " << worker_index; - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ + << " to Worker with TMB client ID " << workers_->getClientID(worker_index); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index a56b714..b6b97a0 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -212,6 +212,10 @@ target_link_libraries(quickstep_queryoptimizer_QueryHandle quickstep_queryexecution_QueryContext_proto quickstep_queryoptimizer_QueryPlan quickstep_utility_Macros) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryoptimizer_QueryHandle + tmb) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryoptimizer_QueryPlan quickstep_relationaloperators_RelationalOperator quickstep_utility_DAG http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/QueryHandle.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp index 5f3649a..bbf1918 100644 --- a/query_optimizer/QueryHandle.hpp +++ b/query_optimizer/QueryHandle.hpp @@ -24,9 +24,14 @@ #include "catalog/Catalog.pb.h" #include "query_execution/QueryContext.pb.h" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "query_optimizer/QueryPlan.hpp" #include "utility/Macros.hpp" +#ifdef QUICKSTEP_DISTRIBUTED +#include "tmb/id_typedefs.h" +#endif // QUICKSTEP_DISTRIBUTED + namespace quickstep { class CatalogRelation; @@ -119,6 +124,22 @@ class QueryHandle { query_result_relation_ = relation; } +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Get the client id. + */ + tmb::client_id getClientId() const { + return cli_id_; + } + + /** + * @brief Set the client id. + */ + void setClientId(const tmb::client_id cli_id) { + cli_id_ = cli_id; + } +#endif // QUICKSTEP_DISTRIBUTED + private: const std::size_t query_id_; const std::uint64_t query_priority_; @@ -134,6 +155,11 @@ class QueryHandle { // and deleted by the Cli shell. const CatalogRelation *query_result_relation_; +#ifdef QUICKSTEP_DISTRIBUTED + // The client id of the CLI which sends the query. + tmb::client_id cli_id_; +#endif // QUICKSTEP_DISTRIBUTED + DISALLOW_COPY_AND_ASSIGN(QueryHandle); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index 9cad47f..6522117 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -78,6 +78,14 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest + DistributedExecutionGeneratorTest.cpp + DistributedExecutionGeneratorTestRunner.cpp + DistributedExecutionGeneratorTestRunner.hpp + "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp" + "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp") +endif(ENABLE_DISTRIBUTED) add_executable(quickstep_queryoptimizer_tests_ExecutionGeneratorTest ExecutionGeneratorTest.cpp ExecutionGeneratorTestRunner.cpp @@ -107,6 +115,39 @@ add_executable(quickstep_queryoptimizer_tests_OptimizerTextTest "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp" "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp") +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest + glog + gtest + gtest_main + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogTypedefs + quickstep_cli_DropRelation + quickstep_cli_PrintToScreen + quickstep_parser_ParseStatement + quickstep_parser_SqlParserWrapper + quickstep_queryexecution_ForemanDistributed + quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_Shiftboss + quickstep_queryexecution_Worker + quickstep_queryexecution_WorkerDirectory + quickstep_queryexecution_WorkerMessage + quickstep_queryoptimizer_ExecutionGenerator + quickstep_queryoptimizer_LogicalGenerator + quickstep_queryoptimizer_OptimizerContext + quickstep_queryoptimizer_PhysicalGenerator + quickstep_queryoptimizer_QueryHandle + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_utility_Macros + quickstep_utility_MemStream + quickstep_utility_SqlError + quickstep_utility_TextBasedTestDriver + tmb + ${LIBS}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest ${GFLAGS_LIB_NAME} glog http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp new file mode 100644 index 0000000..fc0c67d --- /dev/null +++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp @@ -0,0 +1,57 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include +#include + +#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp" +#include "utility/textbased_test/TextBasedTestDriver.hpp" +#include "utility/textbased_test/TextBasedTest.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +using quickstep::TextBasedTest; + +QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST); + +int main(int argc, char** argv) { + google::InitGoogleLogging(argv[0]); + // Honor FLAGS_buffer_pool_slots in StorageManager. + gflags::ParseCommandLineFlags(&argc, &argv, true); + + if (argc < 4) { + LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1 + << " are provided"; + } + + std::ifstream input_file(argv[1]); + CHECK(input_file.is_open()) << argv[1]; + std::unique_ptr + test_runner( + new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3])); + test_driver.reset( + new quickstep::TextBasedTestDriver(&input_file, test_runner.get())); + test_driver->registerOption( + quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption); + + ::testing::InitGoogleTest(&argc, argv); + const int success = RUN_ALL_TESTS(); + if (success != 0) { + test_driver->writeActualOutputToFile(argv[2]); + } + + return success; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp new file mode 100644 index 0000000..ffed4f0 --- /dev/null +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp" + +#include +#include +#include + +#include "cli/DropRelation.hpp" +#include "cli/PrintToScreen.hpp" +#include "parser/ParseStatement.hpp" +#include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_optimizer/ExecutionGenerator.hpp" +#include "query_optimizer/LogicalGenerator.hpp" +#include "query_optimizer/OptimizerContext.hpp" +#include "query_optimizer/PhysicalGenerator.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "utility/MemStream.hpp" +#include "utility/SqlError.hpp" + +#include "glog/logging.h" + +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +namespace quickstep { + +class CatalogRelation; + +namespace optimizer { + +const char *DistributedExecutionGeneratorTestRunner::kResetOption = + "reset_before_execution"; + +void DistributedExecutionGeneratorTestRunner::runTestCase( + const std::string &input, const std::set &options, + std::string *output) { + // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator. + + VLOG(4) << "Test SQL(s): " << input; + + if (options.find(kResetOption) != options.end()) { + test_database_loader_.clear(); + test_database_loader_.createTestRelation(false /* allow_vchar */); + test_database_loader_.loadTestRelation(); + } + + MemStream output_stream; + sql_parser_.feedNextBuffer(new std::string(input)); + + while (true) { + ParseResult result = sql_parser_.getNextStatement(); + + OptimizerContext optimizer_context(query_id_++, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager()); + + if (result.condition != ParseResult::kSuccess) { + if (result.condition == ParseResult::kError) { + *output = result.error_message; + } + break; + } + + std::printf("%s\n", result.parsed_statement->toString().c_str()); + try { + QueryHandle query_handle(optimizer_context.query_id()); + LogicalGenerator logical_generator(&optimizer_context); + PhysicalGenerator physical_generator; + ExecutionGenerator execution_generator(&optimizer_context, + &query_handle); + + const physical::PhysicalPtr physical_plan = + physical_generator.generatePlan( + logical_generator.generatePlan(*result.parsed_statement)); + execution_generator.generatePlan(physical_plan); + + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + cli_id_, + foreman_->getBusClientID(), + &query_handle, + &bus_); + + const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true); + DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type()); + + const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); + if (query_result_relation) { + PrintToScreen::PrintRelation(*query_result_relation, + test_database_loader_.storage_manager(), + output_stream.file()); + DropRelation::Drop(*query_result_relation, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager()); + } + } catch (const SqlError &error) { + *output = error.formatMessage(input); + break; + } + } + + if (output->empty()) { + *output = output_stream.str(); + } +} + +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp new file mode 100644 index 0000000..cd59596 --- /dev/null +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -0,0 +1,146 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_ + +#include +#include +#include +#include +#include + +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "parser/SqlParserWrapper.hpp" +#include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/Shiftboss.hpp" +#include "query_execution/Worker.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "query_optimizer/tests/TestDatabaseLoader.hpp" +#include "utility/Macros.hpp" +#include "utility/textbased_test/TextBasedTestDriver.hpp" +#include "utility/textbased_test/TextBasedTestRunner.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +namespace quickstep { +namespace optimizer { + +/** + * @brief TextBasedTestRunner for testing the ExecutionGenerator in the + * distributed version. + */ +class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { + public: + /** + * @brief If this option is enabled, recreate the entire database and + * repopulate the data before every test. + */ + static const char *kResetOption; + + /** + * @brief Constructor. + */ + explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path) + : query_id_(0), + test_database_loader_(storage_path) { + test_database_loader_.createTestRelation(false /* allow_vchar */); + test_database_loader_.loadTestRelation(); + + bus_.Initialize(); + + // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former + // could receive a registration message from the latter. + foreman_.reset(new ForemanDistributed(&bus_, test_database_loader_.catalog_database())); + + worker_.reset(new Worker(0 /* worker_thread_index */, &bus_)); + + std::vector worker_client_ids; + worker_client_ids.push_back(worker_->getBusClientID()); + + // We don't use the NUMA aware version of worker code. + const std::vector numa_nodes(worker_client_ids.size(), kAnyNUMANodeID); + + workers_.reset( + new WorkerDirectory(worker_client_ids.size(), worker_client_ids, numa_nodes)); + + shiftboss_.reset(new Shiftboss(&bus_, test_database_loader_.storage_manager(), workers_.get())); + + cli_id_ = bus_.Connect(); + bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage); + bus_.RegisterClientAsSender(cli_id_, kPoisonMessage); + bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); + + foreman_->start(); + + shiftboss_->start(); + worker_->start(); + } + + ~DistributedExecutionGeneratorTestRunner() { + std::unique_ptr poison_message(WorkerMessage::PoisonMessage()); + tmb::TaggedMessage poison_tagged_message(poison_message.get(), + sizeof(*poison_message), + quickstep::kPoisonMessage); + + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage( + &bus_, + cli_id_, + foreman_->getBusClientID(), + std::move(poison_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + + worker_->join(); + shiftboss_->join(); + + foreman_->join(); + } + + void runTestCase(const std::string &input, + const std::set &options, + std::string *output) override; + + private: + std::size_t query_id_; + + SqlParserWrapper sql_parser_; + TestDatabaseLoader test_database_loader_; + + MessageBusImpl bus_; + + tmb::client_id cli_id_; + + std::unique_ptr foreman_; + + std::unique_ptr worker_; + std::unique_ptr workers_; + + std::unique_ptr shiftboss_; + + DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner); +}; + +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_TESTS_DISTRIBUTED_EXECUTION_GENERATOR_TEST_RUNNER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/query_optimizer/tests/execution_generator/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt index 56bae16..cd0e626 100644 --- a/query_optimizer/tests/execution_generator/CMakeLists.txt +++ b/query_optimizer/tests/execution_generator/CMakeLists.txt @@ -13,6 +13,61 @@ # See the License for the specific language governing permissions and # limitations under the License. +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_create + "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Create.test" + "${CMAKE_CURRENT_BINARY_DIR}/Create.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_delete + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Delete.test" + "${CMAKE_CURRENT_BINARY_DIR}/Delete.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_distinct + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Distinct.test" + "${CMAKE_CURRENT_BINARY_DIR}/Distinct.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_drop + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Drop.test" + "${CMAKE_CURRENT_BINARY_DIR}/Drop.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_index + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Index.test" + "${CMAKE_CURRENT_BINARY_DIR}/Index.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_insert + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Insert.test" + "${CMAKE_CURRENT_BINARY_DIR}/Insert.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_join + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Join.test" + "${CMAKE_CURRENT_BINARY_DIR}/Join.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_select + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Select.test" + "${CMAKE_CURRENT_BINARY_DIR}/Select.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_stringpatternmatching + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/StringPatternMatching.test" + "${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_tablegenerator + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/TableGenerator.test" + "${CMAKE_CURRENT_BINARY_DIR}/TableGenerator.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator/") +add_test(quickstep_queryoptimizer_tests_distributed_executiongenerator_update + "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Update.test" + "${CMAKE_CURRENT_BINARY_DIR}/Update.test" + "${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate/") add_test(quickstep_queryoptimizer_tests_executiongenerator_create "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest" "${CMAKE_CURRENT_SOURCE_DIR}/Create.test" @@ -74,6 +129,17 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedCreate) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDelete) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDistinct) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedDrop) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedIndex) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedInsert) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedJoin) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedSelect) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedStringPatternMatching) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedTableGenerator) +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistributedUpdate) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Drop) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Index) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Insert) @@ -81,4 +147,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator) -file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update) \ No newline at end of file +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/220fa06f/third_party/tmb/include/tmb/tagged_message.h ---------------------------------------------------------------------- diff --git a/third_party/tmb/include/tmb/tagged_message.h b/third_party/tmb/include/tmb/tagged_message.h index 49dcee7..75b980e 100644 --- a/third_party/tmb/include/tmb/tagged_message.h +++ b/third_party/tmb/include/tmb/tagged_message.h @@ -63,6 +63,15 @@ class TaggedMessage { } /** + * @brief Constructor which creates an empty, typed message. + **/ + explicit TaggedMessage(const message_type_id message_type) + : payload_inline_(true), + message_type_(message_type) { + payload_.in_line.size = 0; + } + + /** * @brief Constructor. * * @param msg A pointer to the message contents in memory, which will be