Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B326A200B63 for ; Mon, 15 Aug 2016 22:49:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B1898160AA7; Mon, 15 Aug 2016 20:49:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7ED7D160A8A for ; Mon, 15 Aug 2016 22:49:02 +0200 (CEST) Received: (qmail 94746 invoked by uid 500); 15 Aug 2016 20:49:01 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 94737 invoked by uid 99); 15 Aug 2016 20:49:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Aug 2016 20:49:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0E9381A7A8E for ; Mon, 15 Aug 2016 20:49:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5dt_Vs4ti7Gk for ; Mon, 15 Aug 2016 20:48:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id C54535FC28 for ; Mon, 15 Aug 2016 20:48:57 +0000 (UTC) Received: (qmail 94694 invoked by uid 99); 15 Aug 2016 20:48:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Aug 2016 20:48:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E637DDFCC0; Mon, 15 Aug 2016 20:48:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <4e3c5ceedbe94580b6c7c363c6061d31@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Added ForemanDistributed. [Forced Update!] Date: Mon, 15 Aug 2016 20:48:56 +0000 (UTC) archived-at: Mon, 15 Aug 2016 20:49:03 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/dist-foreman 626f726ae -> 203d3ea66 (forced update) Added ForemanDistributed. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/203d3ea6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/203d3ea6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/203d3ea6 Branch: refs/heads/dist-foreman Commit: 203d3ea66e4c1f72f7edc858b5b243ae9db33eba Parents: 1325a6a Author: Zuyu Zhang Authored: Sat Aug 13 23:37:59 2016 -0700 Committer: Zuyu Zhang Committed: Mon Aug 15 13:48:32 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 24 ++ query_execution/ForemanDistributed.cpp | 335 ++++++++++++++++++++++++++++ query_execution/ForemanDistributed.hpp | 130 +++++++++++ 3 files changed, 489 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 4033594..1b27194 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) if (ENABLE_DISTRIBUTED) @@ -86,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase quickstep_threading_Thread quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_ForemanDistributed + glog + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_catalog_Catalog_proto + quickstep_queryexecution_AdmitRequestMessage + quickstep_queryexecution_ForemanBase + quickstep_queryexecution_PolicyEnforcerDistributed + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_ShiftbossDirectory + quickstep_threading_ThreadUtil + quickstep_utility_EqualsAnyConstant + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ForemanSingleNode glog quickstep_queryexecution_AdmitRequestMessage @@ -316,6 +339,7 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_ForemanDistributed quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp new file mode 100644 index 0000000..29f5b9b --- /dev/null +++ b/query_execution/ForemanDistributed.cpp @@ -0,0 +1,335 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/ForemanDistributed.hpp" + +#include +#include +#include +#include +#include +#include + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/AdmitRequestMessage.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "threading/ThreadUtil.hpp" +#include "utility/EqualsAnyConstant.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/message_style.h" +#include "tmb/tagged_message.h" + +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::AnnotatedMessage; +using tmb::MessageBus; +using tmb::TaggedMessage; +using tmb::client_id; + +namespace quickstep { + +namespace S = serialization; + +class QueryHandle; + +ForemanDistributed::ForemanDistributed( + MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id, + const bool profile_individual_workorders) + : ForemanBase(bus, cpu_id), + catalog_database_(DCHECK_NOTNULL(catalog_database)) { + const std::vector sender_message_types{ + kShiftbossRegistrationResponseMessage, + kQueryInitiateMessage, + kWorkOrderMessage, + kInitiateRebuildMessage, + kQueryTeardownMessage, + kSaveQueryResultMessage, + kQueryExecutionSuccessMessage, + kPoisonMessage}; + + for (const auto message_type : sender_message_types) { + bus_->RegisterClientAsSender(foreman_client_id_, message_type); + } + + const std::vector receiver_message_types{ + kShiftbossRegistrationMessage, + kAdmitRequestMessage, + kQueryInitiateResponseMessage, + kCatalogRelationNewBlockMessage, + kDataPipelineMessage, + kInitiateRebuildResponseMessage, + kWorkOrderCompleteMessage, + kRebuildWorkOrderCompleteMessage, + kWorkOrderFeedbackMessage, + kSaveQueryResultResponseMessage, + kPoisonMessage}; + + for (const auto message_type : receiver_message_types) { + bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); + } + + policy_enforcer_.reset(new PolicyEnforcerDistributed( + foreman_client_id_, + catalog_database_, + &shiftboss_directory_, + bus_, + profile_individual_workorders)); +} + +void ForemanDistributed::run() { + if (cpu_id_ >= 0) { + // We can pin the foreman thread to a CPU if specified. + ThreadUtil::BindToCPU(cpu_id_); + } + + // Ensure that at least one Shiftboss to register. + if (shiftboss_directory_.empty()) { + const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); + DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); + DCHECK_EQ(1u, shiftboss_directory_.size()); + } + + // Event loop + for (;;) { + // Receive() causes this thread to sleep until next message is received. + const AnnotatedMessage annotated_message = + bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); + DLOG(INFO) << "ForemanDistributed received typed '" << message_type + << "' message from client " << annotated_message.sender; + switch (message_type) { + case kShiftbossRegistrationMessage: { + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); + break; + } + case kAdmitRequestMessage: { + const AdmitRequestMessage *request_message = + static_cast(tagged_message.message()); + + const vector &query_handles = request_message->getQueryHandles(); + DCHECK(!query_handles.empty()); + + bool all_queries_admitted = true; + if (query_handles.size() == 1u) { + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles.front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. + } + break; + } + case kQueryInitiateResponseMessage: { + // TODO(zuyu): check the query id. + break; + } + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: { + policy_enforcer_->processMessage(tagged_message); + break; + } + case kInitiateRebuildResponseMessage: { + // A unique case in the distributed version. + policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message); + break; + } + case kSaveQueryResultResponseMessage: { + S::SaveQueryResultResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); + break; + } + case kPoisonMessage: { + if (policy_enforcer_->hasQueries()) { + LOG(WARNING) << "ForemanDistributed thread exiting while some queries are " + "under execution or waiting to be admitted"; + } + + // Shutdown all Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i)); + } + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + TaggedMessage poison_message(kPoisonMessage); + + const MessageBus::SendStatus send_status = + bus_->Send(foreman_client_id_, + shiftboss_addresses, + broadcast_style, + move(poison_message)); + DCHECK(send_status == MessageBus::SendStatus::kOK); + return; + } + default: + LOG(FATAL) << "Unknown message type to ForemanDistributed"; + } + + if (canCollectNewMessages(message_type)) { + vector> new_messages; + policy_enforcer_->getWorkOrderProtoMessages(&new_messages); + dispatchWorkOrderMessages(new_messages); + } + } +} + +bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { + return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, + kCatalogRelationNewBlockMessage, + kWorkOrderFeedbackMessage) && + // TODO(zuyu): Multiple Shiftbosses support. + !shiftboss_directory_.hasReachedCapacity(0); +} + +void ForemanDistributed::dispatchWorkOrderMessages(const vector> &messages) { + for (const auto &message : messages) { + DCHECK(message != nullptr); + // TODO(zuyu): Multiple Shiftbosses support. + sendWorkOrderMessage(0, *message); + shiftboss_directory_.incrementNumQueuedWorkOrders(0); + } +} + +void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, + const S::WorkOrderMessage &proto) { + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kWorkOrderMessage); + free(proto_bytes); + + const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index); + DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage + << "') to Shiftboss with TMB client ID " << shiftboss_client_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_client_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const { + const std::vector &recorded_times = + policy_enforcer_->getProfilingResults(query_id); + fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out); + for (const auto &workorder_entry : recorded_times) { + const std::size_t worker_id = workorder_entry.worker_id; + fprintf(out, + "%lu,%lu,%lu,%lu\n", + query_id, + worker_id, + workorder_entry.operator_id, // Operator ID. + workorder_entry.end_time - workorder_entry.start_time); // Time. + } +} + +void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id, + const std::size_t work_order_capacity) { + S::ShiftbossRegistrationResponseMessage proto; + proto.set_shiftboss_index(shiftboss_directory_.size()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kShiftbossRegistrationResponseMessage); + free(proto_bytes); + + shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); + + DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '" + << kShiftbossRegistrationResponseMessage + << "') to Shiftboss with TMB client id " << shiftboss_client_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_client_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id, + const relation_id result_relation_id) { + S::QueryExecutionSuccessMessage proto; + proto.mutable_result_relation()->MergeFrom( + static_cast(catalog_database_)->getRelationById(result_relation_id)->getProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kQueryExecutionSuccessMessage); + free(proto_bytes); + + // Notify the CLI regarding the query result. + DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" + << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp new file mode 100644 index 0000000..f9a326a --- /dev/null +++ b/query_execution/ForemanDistributed.hpp @@ -0,0 +1,130 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ + +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/ForemanBase.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class CatalogDatabaseLite; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief The Foreman receives queries from the main thread, messages from the + * policy enforcer and dispatches the work to Shiftbosses. It also + * receives work completion messages from Shiftbosses. + **/ +class ForemanDistributed final : public ForemanBase { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param catalog_database The catalog database where this query is executed. + * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. + * @param profile_individual_workorders Whether every workorder's execution + * be profiled or not. + * + * @note If cpu_id is not specified, Foreman thread can be possibly moved + * around on different CPUs by the OS. + **/ + ForemanDistributed(tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id = -1, + const bool profile_individual_workorders = false); + + ~ForemanDistributed() override {} + + /** + * @brief Print the results of profiling individual work orders for a given + * query. + * + * TODO(harshad) - Add the name of the operator to the output. + * + * @param query_id The ID of the query for which the results are to be printed. + * @param out The file stream. + **/ + void printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const; + + protected: + void run() override; + + private: + /** + * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the + * worker threads. + * + * @param messages The messages to be dispatched. + **/ + void dispatchWorkOrderMessages( + const std::vector> &messages); + + /** + * @brief Send the given message to the specified worker. + * + * @param worker_index The logical index of the recipient worker in + * ShiftbossDirectory. + * @param proto The WorkOrderMessage to be sent. + **/ + void sendWorkOrderMessage(const std::size_t worker_index, + const serialization::WorkOrderMessage &proto); + + void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id, + const std::size_t work_order_capacity); + + void processSaveQueryResultResponseMessage(const tmb::client_id cli_id, + const relation_id result_relation_id); + + /** + * @brief Check if we can collect new messages from the PolicyEnforcer. + * + * @param message_type The type of the last received message. + **/ + bool canCollectNewMessages(const tmb::message_type_id message_type); + + ShiftbossDirectory shiftboss_directory_; + + CatalogDatabaseLite *catalog_database_; + + std::unique_ptr policy_enforcer_; + + DISALLOW_COPY_AND_ASSIGN(ForemanDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_