Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D767200B5C for ; Thu, 28 Jul 2016 00:58:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C37B160A90; Wed, 27 Jul 2016 22:58:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C0242160A93 for ; Thu, 28 Jul 2016 00:58:24 +0200 (CEST) Received: (qmail 15916 invoked by uid 500); 27 Jul 2016 22:58:24 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 15907 invoked by uid 99); 27 Jul 2016 22:58:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jul 2016 22:58:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8449ECDD5D for ; Wed, 27 Jul 2016 22:58:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.507 X-Spam-Level: X-Spam-Status: No, score=-4.507 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.287] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ymBlJj2LdDnD for ; Wed, 27 Jul 2016 22:58:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 8093A5FBB6 for ; Wed, 27 Jul 2016 22:58:19 +0000 (UTC) Received: (qmail 14713 invoked by uid 99); 27 Jul 2016 22:58:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jul 2016 22:58:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A0E4DE02E4; Wed, 27 Jul 2016 22:58:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 27 Jul 2016 22:58:19 -0000 Message-Id: <629c15d6be654ec68d8da7b91893ea83@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-quickstep git commit: Introduced Shiftboss for the distributed version. archived-at: Wed, 27 Jul 2016 22:58:26 -0000 Introduced Shiftboss for the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7415ee87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7415ee87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7415ee87 Branch: refs/heads/policy-enforcer-dist Commit: 7415ee87fa088054e09b2a9efb88389c43351b12 Parents: 9f9e3b7 Author: Zuyu Zhang Authored: Fri Jul 22 13:29:03 2016 -0700 Committer: Zuyu Zhang Committed: Wed Jul 27 15:06:13 2016 -0700 ---------------------------------------------------------------------- catalog/CatalogDatabaseCache.hpp | 5 + query_execution/CMakeLists.txt | 24 ++ query_execution/QueryExecutionMessages.proto | 30 ++ query_execution/QueryExecutionTypedefs.hpp | 8 + query_execution/Shiftboss.cpp | 360 ++++++++++++++++++++++ query_execution/Shiftboss.hpp | 241 +++++++++++++++ storage/StorageManager.hpp | 1 + 7 files changed, 669 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/catalog/CatalogDatabaseCache.hpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp index 77afe2a..b3e73a6 100644 --- a/catalog/CatalogDatabaseCache.hpp +++ b/catalog/CatalogDatabaseCache.hpp @@ -54,6 +54,11 @@ namespace serialization { class CatalogDatabase; } class CatalogDatabaseCache : public CatalogDatabaseLite { public: /** + * @brief Constructor. + **/ + CatalogDatabaseCache() {} + + /** * @brief Constructor. Reconstruct a database cache from its serialized * Protocol Buffer form. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index f582ba5..8bf1ab1 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -52,6 +52,7 @@ if (ENABLE_DISTRIBUTED) endif() add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp) if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp) add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp) endif() add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp) @@ -157,6 +158,8 @@ target_link_libraries(quickstep_queryexecution_QueryContext_proto quickstep_utility_SortConfiguration_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_catalog_Catalog_proto + quickstep_queryexecution_QueryContext_proto quickstep_relationaloperators_WorkOrder_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_queryexecution_QueryExecutionState @@ -214,6 +217,26 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode quickstep_utility_Macros tmb) if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_Shiftboss + glog + quickstep_catalog_CatalogDatabaseCache + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_WorkerDirectory + quickstep_queryexecution_WorkerMessage + quickstep_relationaloperators_RebuildWorkOrder + quickstep_relationaloperators_WorkOrderFactory + quickstep_storage_InsertDestination + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_threading_Thread + quickstep_threading_ThreadUtil + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_queryexecution_ShiftbossDirectory quickstep_utility_Macros tmb) @@ -272,6 +295,7 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator quickstep_queryexecution_QueryManagerDistributed + quickstep_queryexecution_Shiftboss quickstep_queryexecution_ShiftbossDirectory) endif() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index fa20993..591ca6c 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -16,6 +16,8 @@ syntax = "proto2"; package quickstep.serialization; +import "catalog/Catalog.proto"; +import "query_execution/QueryContext.proto"; import "relational_operators/WorkOrder.proto"; // Used for any messages that do not carry payloads. @@ -73,6 +75,25 @@ message WorkOrdersAvailableMessage { } // Distributed version related messages. +message ShiftbossRegistrationMessage { + // The total Work Order processing capacity in Shiftboss, which equals to the + // sum of the capacity of each worker managed by Shiftboss. + required uint64 work_order_capacity = 1; +} + +message ShiftbossRegistrationResponseMessage { +} + +message QueryInitiateMessage { + required uint64 query_id = 1; + required CatalogDatabase catalog_database_cache = 2; + required QueryContext query_context = 3; +} + +message QueryInitiateResponseMessage { + required uint64 query_id = 1; +} + message WorkOrderMessage { required uint64 query_id = 1; required uint64 operator_index = 2; @@ -92,6 +113,15 @@ message InitiateRebuildResponseMessage { required uint64 num_rebuild_work_orders = 3; } +message QueryResultRelationMessage { + required int32 relation_id = 1; + repeated fixed64 blocks = 2 [packed=true]; +} + +message QueryResultRelationResponseMessage { + required int32 relation_id = 1; +} + // BlockLocator related messages. message BlockDomainRegistrationMessage { // Format IP:Port, i.e., "0.0.0.0:0". http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 61e76d7..d73d4ee 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -73,9 +73,17 @@ enum QueryExecutionMessageType : message_type_id { kPoisonMessage, // From the main thread to Foreman and Workers. #ifdef QUICKSTEP_DISTRIBUTED + kShiftbossRegistrationMessage, // From Shiftboss to Foreman. + kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss. + kQueryInitiateMessage, // From Foreman to Shiftboss. + kQueryInitiateResponseMessage, // From Shiftboss to Foreman. + kInitiateRebuildMessage, // From Foreman to Shiftboss. kInitiateRebuildResponseMessage, // From Shiftboss to Foreman. + kQueryResultRelationMessage, // From Foreman to Shiftboss. + kQueryResultRelationResponseMessage, // From Shiftboss to Foreman. + // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. kBlockDomainRegistrationMessage, // From Worker to BlockLocator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp new file mode 100644 index 0000000..af56306 --- /dev/null +++ b/query_execution/Shiftboss.cpp @@ -0,0 +1,360 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/Shiftboss.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "relational_operators/RebuildWorkOrder.hpp" +#include "relational_operators/WorkOrderFactory.hpp" +#include "storage/InsertDestination.hpp" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "threading/ThreadUtil.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/message_style.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::size_t; +using std::string; +using std::unique_ptr; +using std::vector; + +using tmb::TaggedMessage; + +namespace quickstep { + +class WorkOrder; + +void Shiftboss::run() { + if (cpu_id_ >= 0) { + // We can pin the shiftboss thread to a CPU if specified. + ThreadUtil::BindToCPU(cpu_id_); + } + + for (;;) { + // Receive() is a blocking call, causing this thread to sleep until next + // message is received. + AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') received the typed '" << annotated_message.tagged_message.message_type() + << "' message from client " << annotated_message.sender; + switch (annotated_message.tagged_message.message_type()) { + case kShiftbossRegistrationResponseMessage: { + foreman_client_id_ = annotated_message.sender; + break; + } + case kQueryInitiateMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::QueryInitiateMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); + break; + } + case kWorkOrderMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::WorkOrderMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK_EQ(1u, query_contexts_.count(query_id)); + + WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(), + &database_cache_, + query_contexts_[query_id].get(), + storage_manager_, + shiftboss_client_id_, + bus_); + + unique_ptr worker_message( + WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); + + TaggedMessage worker_tagged_message(worker_message.get(), + sizeof(*worker_message), + kWorkOrderMessage); + + const size_t worker_index = getSchedulableWorker(); + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage + << "') from Foreman to worker " << worker_index; + + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + break; + } + case kInitiateRebuildMessage: { + // Construct rebuild work orders, and send back their number to + // 'ForemanDistributed'. + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::InitiateRebuildMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processInitiateRebuildMessage(proto.query_id(), + proto.operator_index(), + proto.insert_destination_index(), + proto.relation_id()); + break; + } + case kWorkOrderCompleteMessage: // Fall through. + case kRebuildWorkOrderCompleteMessage: + case kDataPipelineMessage: + case kWorkOrdersAvailableMessage: + case kWorkOrderFeedbackMessage: { + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded typed '" << annotated_message.tagged_message.message_type() + << "' message from worker (client " << annotated_message.sender << ") to Foreman"; + + DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(annotated_message.tagged_message)); + break; + } + case kQueryResultRelationMessage: { + // TODO(zuyu): Rename to kSaveQueryResultMessage. + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::QueryResultRelationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + for (int i = 0; i < proto.blocks_size(); ++i) { + const block_id block = proto.blocks(i); + storage_manager_->saveBlockOrBlob(block); + if (storage_manager_->blockOrBlobIsLoaded(block)) { + // NOTE(zuyu): eviction is required to avoid accesses to the query + // result relation schema in CatalogDatabaseCache, for all query + // optimizer execution generator unit tests and the single-process + // Quickstep CLI. + storage_manager_->evictBlockOrBlob(block); + } + } + + serialization::QueryResultRelationResponseMessage ack_proto; + ack_proto.set_relation_id(proto.relation_id()); + + const size_t ack_proto_length = ack_proto.ByteSize(); + char *ack_proto_bytes = static_cast(malloc(ack_proto_length)); + CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length)); + + TaggedMessage ack_message(static_cast(ack_proto_bytes), + ack_proto_length, + kQueryResultRelationResponseMessage); + free(ack_proto_bytes); + + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage + << ") to Foreman"; + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(ack_message)); + break; + } + case kPoisonMessage: { + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded PoisonMessage (typed '" << kPoisonMessage + << "') from Foreman to all workers"; + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + tmb::MessageBus::SendStatus send_status = + bus_->Send(shiftboss_client_id_, + worker_addresses_, + broadcast_style, + move(annotated_message.tagged_message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); + return; + } + default: { + LOG(FATAL) << "Unknown TMB message type"; + } + } + } +} + +size_t Shiftboss::getSchedulableWorker() { + const size_t num_workers = workers_->getNumWorkers(); + + size_t curr_worker = start_worker_index_; + for (;;) { + if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) { + start_worker_index_ = (curr_worker + 1) % num_workers; + // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker); + // But we need a WorkOrder queue first. + return curr_worker; + } + + curr_worker = (curr_worker + 1) % num_workers; + } +} + +void Shiftboss::registerWithForeman() { + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage + << "') to all"; + + tmb::Address all_addresses; + all_addresses.All(true); + + tmb::MessageStyle style; + + serialization::ShiftbossRegistrationMessage proto; + proto.set_work_order_capacity(getWorkOrderCapacity()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kShiftbossRegistrationMessage); + free(proto_bytes); + + tmb::MessageBus::SendStatus send_status = + bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); + DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); +} + +void Shiftboss::processQueryInitiateMessage( + const std::size_t query_id, + const serialization::CatalogDatabase &catalog_database_cache_proto, + const serialization::QueryContext &query_context_proto) { + database_cache_.update(catalog_database_cache_proto); + + unique_ptr query_context( + new QueryContext(query_context_proto, + database_cache_, + storage_manager_, + shiftboss_client_id_, + bus_)); + query_contexts_.emplace(query_id, move(query_context)); + + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage + << "') to Foreman"; + + serialization::QueryInitiateResponseMessage proto; + proto.set_query_id(query_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage ack_message(static_cast(proto_bytes), + proto_length, + kQueryInitiateResponseMessage); + free(proto_bytes); + + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(ack_message)); +} + +void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, + const std::size_t op_index, + const QueryContext::insert_destination_id dest_index, + const relation_id rel_id) { + DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); + + DCHECK_EQ(1u, query_contexts_.count(query_id)); + InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index); + DCHECK(insert_destination != nullptr); + + vector partially_filled_block_refs; + insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); + + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage + << "') to Foreman"; + + serialization::InitiateRebuildResponseMessage proto; + proto.set_query_id(query_id); + proto.set_operator_index(op_index); + proto.set_num_rebuild_work_orders(partially_filled_block_refs.size()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage ack_message(static_cast(proto_bytes), + proto_length, + kInitiateRebuildResponseMessage); + free(proto_bytes); + + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + foreman_client_id_, + move(ack_message)); + + for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) { + // NOTE(zuyu): Worker releases the memory after the execution of + // RebuildWorkOrder on the Worker. + WorkOrder *rebuild_work_order = + new RebuildWorkOrder(query_id, + move(partially_filled_block_refs[i]), + op_index, + rel_id, + shiftboss_client_id_, + bus_); + + unique_ptr worker_message( + WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index)); + + TaggedMessage worker_tagged_message(worker_message.get(), + sizeof(*worker_message), + kRebuildWorkOrderMessage); + + const size_t worker_index = getSchedulableWorker(); + LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage + << "') to worker " << worker_index; + + QueryExecutionUtil::SendTMBMessage(bus_, + shiftboss_client_id_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp new file mode 100644 index 0000000..096ab74 --- /dev/null +++ b/query_execution/Shiftboss.hpp @@ -0,0 +1,241 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_ + +#include +#include +#include + +#include "catalog/CatalogDatabaseCache.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "threading/Thread.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" + +namespace quickstep { + +class StorageManager; + +namespace serialization { +class CatalogDatabase; +class QueryContext; +} // namespace serialization + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns + * the workorders to workers. + **/ +class Shiftboss : public Thread { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param storage_manager The StorageManager to use. + * @param workers A pointer to the WorkerDirectory. + * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned. + * + * @note If cpu_id is not specified, Shiftboss thread can be possibly moved + * around on different CPUs by the OS. + **/ + Shiftboss(tmb::MessageBus *bus, + StorageManager *storage_manager, + WorkerDirectory *workers, + const int cpu_id = -1) + : bus_(DCHECK_NOTNULL(bus)), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + workers_(DCHECK_NOTNULL(workers)), + cpu_id_(cpu_id), + shiftboss_client_id_(tmb::kClientIdNone), + foreman_client_id_(tmb::kClientIdNone), + max_msgs_per_worker_(1), + start_worker_index_(0u) { + // Check to have at least one Worker. + DCHECK_GT(workers->getNumWorkers(), 0u); + + shiftboss_client_id_ = bus_->Connect(); + LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_; + DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone); + + // Messages between Foreman and Shiftboss. + bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage); + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage); + + // Message sent to Worker. + bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage); + + // Message sent to Foreman. + bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); + + // Forward the following message types from Foreman to Workers. + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage); + + // Forward the following message types from Workers to Foreman. + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage); + + // Stop itself. + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage); + // Stop all workers. + bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage); + + for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) { + worker_addresses_.AddRecipient(workers_->getClientID(i)); + } + + registerWithForeman(); + } + + ~Shiftboss() override { + } + + /** + * @brief Get the TMB client ID of Shiftboss thread. + * + * @return TMB client ID of shiftboss thread. + **/ + inline tmb::client_id getBusClientID() const { + return shiftboss_client_id_; + } + + /** + * @brief Get the Work Order processing capacity of all Workers managed by + * Shiftboss during a single round of WorkOrder dispatch. + **/ + inline std::size_t getWorkOrderCapacity() const { + DCHECK_NE(max_msgs_per_worker_, 0u); + return max_msgs_per_worker_ * workers_->getNumWorkers(); + } + + /** + * @brief Get the Worker to assign WorkOrders for execution. Block to wait if + * all Workers have reached their capacity for queued WorkOrders. + **/ + // TODO(zuyu): To achieve non-blocking, we need a queue to cache received + // normal Work Order protos from Foreman and the generated rebuild Work Orders. + inline std::size_t getSchedulableWorker(); + + /** + * @brief Set the maximum number of messages that should be allocated to each + * worker during a single round of WorkOrder dispatch. + * + * @param max_msgs_per_worker Maximum number of messages. + **/ + inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) { + max_msgs_per_worker_ = max_msgs_per_worker; + } + + protected: + /** + * @brief The shiftboss receives workorders, and based on the response it + * assigns workorders to workers. + * + * @note The workers who get the messages from the Shiftboss execute and + * subsequently delete the WorkOrder contained in the message. + **/ + void run() override; + + private: + void registerWithForeman(); + + /** + * @brief Process the Shiftboss initiate message and ack back. + * + * @param query_id The given query id. + * @param catalog_database_cache_proto The proto used to update + * CatalogDatabaseCache. + * @param query_context_proto The QueryContext proto. + **/ + void processQueryInitiateMessage(const std::size_t query_id, + const serialization::CatalogDatabase &catalog_database_cache_proto, + const serialization::QueryContext &query_context_proto); + + /** + * @brief Process the RebuildWorkOrder initiate message and ack back. + * + * @param query_id The ID of the query to which this RebuildWorkOrder initiate + * message belongs. + * @param op_index The index of the operator for rebuild work orders. + * @param dest_index The InsertDestination index in QueryContext to rebuild. + * @param rel_id The relation that needs to generate rebuild work orders. + **/ + void processInitiateRebuildMessage(const std::size_t query_id, + const std::size_t op_index, + const QueryContext::insert_destination_id dest_index, + const relation_id rel_id); + + // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss, + // and Shiftboss and Worker thread pool. + tmb::MessageBus *bus_; + + CatalogDatabaseCache database_cache_; + StorageManager *storage_manager_; + WorkerDirectory *workers_; + + // The ID of the CPU that the Shiftboss thread can optionally be pinned to. + const int cpu_id_; + + tmb::client_id shiftboss_client_id_, foreman_client_id_; + + // TMB recipients for all workers managed by this Shiftboss. + tmb::Address worker_addresses_; + + // During a single round of WorkOrder dispatch, a Worker should be allocated + // at most these many WorkOrders. + std::size_t max_msgs_per_worker_; + + // The worker index for scheduling Work Order. + std::size_t start_worker_index_; + + // QueryContexts per query. + std::unordered_map> query_contexts_; + + DISALLOW_COPY_AND_ASSIGN(Shiftboss); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7415ee87/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index 50ddb0f..348018f 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -619,6 +619,7 @@ class StorageManager { FRIEND_TEST(BlockLocatorTest, BlockTest); FRIEND_TEST(BlockLocatorTest, BlobTest); + friend class Shiftboss; FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction); FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);