Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA69E200B54 for ; Thu, 28 Jul 2016 22:25:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A7628160AA9; Thu, 28 Jul 2016 20:25:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C6A7D160A85 for ; Thu, 28 Jul 2016 22:25:06 +0200 (CEST) Received: (qmail 99258 invoked by uid 500); 28 Jul 2016 20:25:06 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 99239 invoked by uid 99); 28 Jul 2016 20:25:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jul 2016 20:25:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 93547C0113 for ; Thu, 28 Jul 2016 20:25:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id gw8t0hJgJ3fL for ; Thu, 28 Jul 2016 20:25:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 53AB85F19B for ; Thu, 28 Jul 2016 20:25:03 +0000 (UTC) Received: (qmail 99179 invoked by uid 99); 28 Jul 2016 20:25:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jul 2016 20:25:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8389FE0844; Thu, 28 Jul 2016 20:25:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Thu, 28 Jul 2016 20:25:03 -0000 Message-Id: <295a1f8065154eed90a4617296be5da3@git.apache.org> In-Reply-To: <19ddd98642604a549dd9d1c3a83561aa@git.apache.org> References: <19ddd98642604a549dd9d1c3a83561aa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-quickstep git commit: Process InitRebuildResponse message. archived-at: Thu, 28 Jul 2016 20:25:07 -0000 Process InitRebuildResponse message. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/53d2dca5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/53d2dca5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/53d2dca5 Branch: refs/heads/policy-enforcer-dist Commit: 53d2dca5ffe8cce3010f3e63131cac9ff7d35632 Parents: aaecc76 Author: Zuyu Zhang Authored: Thu Jul 28 11:54:20 2016 -0700 Committer: Zuyu Zhang Committed: Thu Jul 28 11:57:50 2016 -0700 ---------------------------------------------------------------------- query_execution/PolicyEnforcerBase.cpp | 18 ++++++++++++++++++ query_execution/PolicyEnforcerBase.hpp | 13 +++++++++++++ query_execution/QueryExecutionMessages.proto | 1 + query_execution/QueryManagerBase.hpp | 14 ++++++++++++++ query_execution/QueryManagerDistributed.cpp | 21 +++++++++++++++++++++ query_execution/QueryManagerDistributed.hpp | 4 ++++ 6 files changed, 71 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index d16a502..a0a2c8b 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -29,6 +29,7 @@ #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionState.hpp" #include "query_execution/QueryManagerBase.hpp" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" @@ -60,6 +61,23 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index); break; } +#ifdef QUICKSTEP_DISTRIBUTED + case kInitiateRebuildResponseMessage: { + serialization::InitiateRebuildResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = proto.operator_index(); + const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); + + // Check if new work orders are available. + admitted_queries_[query_id]->processInitiateRebuildResponseMessage(op_index, num_rebuild_work_orders); + incrementNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); + break; + } +#endif // QUICKSTEP_DISTRIBUTED case kRebuildWorkOrderCompleteMessage: { serialization::RebuildWorkOrderCompletionMessage proto; // Note: This proto message contains the time it took to execute the http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index 0482ebc..03088ff 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -27,6 +27,7 @@ #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryManagerBase.hpp" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "utility/Macros.hpp" #include "glog/logging.h" @@ -179,6 +180,18 @@ class PolicyEnforcerBase { **/ virtual bool admitQuery(QueryHandle *query_handle) = 0; +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Increment the number of queued workorders for the given worker. + * + * @param worker_index The logical ID of the given worker. + * @param num_new_work_orders The number of the new work orders will be + * executed on Worker indexed by 'worker_index'. + **/ + virtual void incrementNumQueuedWorkOrders(const std::size_t worker_index, + const std::size_t num_new_work_orders) {} +#endif // QUICKSTEP_DISTRIBUTED + /** * @brief Decrement the number of queued workorders for the given worker by 1. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 308d736..db230e5 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -111,6 +111,7 @@ message InitiateRebuildResponseMessage { required uint64 query_id = 1; required uint64 operator_index = 2; required uint64 num_rebuild_work_orders = 3; + required uint64 shiftboss_index = 4; } message SaveQueryResultMessage { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index 6edfd5c..c0c82e5 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -24,6 +24,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryExecutionState.hpp" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" @@ -128,6 +129,19 @@ class QueryManagerBase { void processFeedbackMessage(const dag_node_index op_index, const WorkOrder::FeedbackMessage &message); +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Process the initiate rebuild work order response message. + * + * @param op_index The index of the specified operator node in the query DAG + * for initiating the rebuild work order. + * @param num_rebuild_work_orders The number of the rebuild work orders + * generated for the operator indexed by 'op_index'. + **/ + virtual void processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) {} +#endif // QUICKSTEP_DISTRIBUTED + /** * @brief Get the query status after processing an incoming message. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index e906fa5..4272fff 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -119,6 +119,27 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) return generated_new_workorder_protos; } +void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) { + // TODO(zuyu): Multiple workers support. + query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true); + + if (num_rebuild_work_orders != 0u) { + // Wait for the rebuild work orders to finish. + return; + } + + markOperatorFinished(op_index); + + for (const std::pair &dependent_link : + query_dag_->getDependents(op_index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + processOperator(dependent_op_index, true); + } + } +} + bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { DCHECK(checkRebuildRequired(index)); DCHECK(!checkRebuildInitiated(index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/53d2dca5/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index 8641c22..131cd86 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -15,6 +15,7 @@ #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ +#include #include #include "query_execution/QueryExecutionState.hpp" @@ -60,6 +61,9 @@ class QueryManagerDistributed final : public QueryManagerBase { bool fetchNormalWorkOrders(const dag_node_index index) override; + void processInitiateRebuildResponseMessage(const dag_node_index op_index, + const std::size_t num_rebuild_work_orders) override; + /** * @brief Get the next normal workorder to be excuted, wrapped in a * WorkOrderMessage proto.