Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2555D200D42 for ; Fri, 17 Nov 2017 20:40:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 23D46160C0D; Fri, 17 Nov 2017 19:40:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B899160C0C for ; Fri, 17 Nov 2017 20:40:50 +0100 (CET) Received: (qmail 34942 invoked by uid 500); 17 Nov 2017 19:40:49 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 34933 invoked by uid 99); 17 Nov 2017 19:40:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Nov 2017 19:40:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C20ACC6901 for ; Fri, 17 Nov 2017 19:40:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id b_RaeB8DPDjl for ; Fri, 17 Nov 2017 19:40:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8D5E25FAF9 for ; Fri, 17 Nov 2017 19:40:43 +0000 (UTC) Received: (qmail 33484 invoked by uid 99); 17 Nov 2017 19:40:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Nov 2017 19:40:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CDEFDF5F41; Fri, 17 Nov 2017 19:40:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Date: Fri, 17 Nov 2017 19:40:44 -0000 Message-Id: In-Reply-To: <1bc6a760c4df4d25adf68610c1012a38@git.apache.org> References: <1bc6a760c4df4d25adf68610c1012a38@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/27] incubator-quickstep git commit: Simplified the work order generation. archived-at: Fri, 17 Nov 2017 19:40:52 -0000 Simplified the work order generation. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8d7284de Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8d7284de Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8d7284de Branch: refs/heads/trace Commit: 8d7284decb7ebf5c0eaac232f39027ddd8bf6144 Parents: 77960a4 Author: Zuyu Zhang Authored: Mon Aug 21 19:51:55 2017 -0500 Committer: Zuyu Zhang Committed: Fri Sep 22 13:43:08 2017 -0500 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 2 - query_execution/ForemanDistributed.cpp | 5 +- query_execution/ForemanSingleNode.cpp | 16 +-- query_execution/QueryManagerBase.cpp | 136 ++++++++----------- query_execution/QueryManagerBase.hpp | 79 ++--------- query_execution/QueryManagerDistributed.cpp | 54 +++----- query_execution/QueryManagerDistributed.hpp | 3 +- query_execution/QueryManagerSingleNode.cpp | 58 ++++---- query_execution/QueryManagerSingleNode.hpp | 7 +- query_execution/WorkOrdersContainer.hpp | 1 + .../tests/QueryManagerSingleNode_unittest.cpp | 58 ++++---- 11 files changed, 152 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 5c750f0..9394c00 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -119,7 +119,6 @@ if (ENABLE_DISTRIBUTED) quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager quickstep_threading_ThreadUtil - quickstep_utility_EqualsAnyConstant quickstep_utility_Macros tmb ${GFLAGS_LIB_NAME}) @@ -135,7 +134,6 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage quickstep_threading_ThreadUtil - quickstep_utility_EqualsAnyConstant quickstep_utility_Macros tmb ${GFLAGS_LIB_NAME}) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 942f383..82cc624 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -48,7 +48,6 @@ #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" #include "threading/ThreadUtil.hpp" -#include "utility/EqualsAnyConstant.hpp" #include "glog/logging.h" @@ -233,9 +232,7 @@ void ForemanDistributed::run() { } bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { - return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, - kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage); + return message_type != kCatalogRelationNewBlockMessage; } bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp index 1501408..d66f1f5 100644 --- a/query_execution/ForemanSingleNode.cpp +++ b/query_execution/ForemanSingleNode.cpp @@ -33,7 +33,6 @@ #include "query_execution/WorkerDirectory.hpp" #include "query_execution/WorkerMessage.hpp" #include "threading/ThreadUtil.hpp" -#include "utility/EqualsAnyConstant.hpp" #include "utility/Macros.hpp" #include "gflags/gflags.h" @@ -179,18 +178,13 @@ void ForemanSingleNode::run() { } bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) { - if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, - kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage)) { - return false; - } else if (worker_directory_->getLeastLoadedWorker().second <= - FLAGS_min_load_per_worker) { - // If the least loaded worker has only one pending work order, we should - // collect new messages and dispatch them. - return true; - } else { + if (message_type == kCatalogRelationNewBlockMessage) { return false; } + + // If the least loaded worker has only one pending work order, we should + // collect new messages and dispatch them. + return (worker_directory_->getLeastLoadedWorker().second <= FLAGS_min_load_per_worker); } void ForemanSingleNode::dispatchWorkerMessages(const vector> &messages) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index 565c6ad..374c96d 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -50,7 +50,9 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) num_operators_in_dag_(query_dag_->size()), output_consumers_(num_operators_in_dag_), blocking_dependencies_(num_operators_in_dag_), - query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) { + query_exec_state_(new QueryExecutionState(num_operators_in_dag_)), + blocking_dependents_(num_operators_in_dag_), + non_blocking_dependencies_(num_operators_in_dag_) { if (FLAGS_visualize_execution_dag) { dag_visualizer_ = std::make_unique(query_handle_->getQueryPlan()); @@ -66,16 +68,22 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) query_exec_state_->setRebuildRequired(node_index); } + if (query_dag_->getDependencies(node_index).empty()) { + non_dependent_operators_.push_back(node_index); + } + for (const pair &dependent_link : query_dag_->getDependents(node_index)) { const dag_node_index dependent_op_index = dependent_link.first; if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) { // The link is a pipeline-breaker. Streaming of blocks is not possible // between these two operators. - blocking_dependencies_[dependent_op_index].push_back(node_index); + blocking_dependencies_[dependent_op_index].insert(node_index); + blocking_dependents_[node_index].push_back(dependent_op_index); } else { // The link is not a pipeline-breaker. Streaming of blocks is possible // between these two operators. + non_blocking_dependencies_[dependent_op_index].insert(node_index); output_consumers_[node_index].push_back(dependent_op_index); } } @@ -102,6 +110,12 @@ void QueryManagerBase::processFeedbackMessage( RelationalOperator *op = query_dag_->getNodePayloadMutable(op_index); op->receiveFeedbackMessage(msg); + + if (query_exec_state_->hasDoneGenerationWorkOrders(op_index)) { + return; + } + + fetchNormalWorkOrders(op_index); } void QueryManagerBase::processWorkOrderCompleteMessage( @@ -109,97 +123,32 @@ void QueryManagerBase::processWorkOrderCompleteMessage( const partition_id part_id) { query_exec_state_->decrementNumQueuedWorkOrders(op_index); - // Check if new work orders are available and fetch them if so. - fetchNormalWorkOrders(op_index); + if (!checkNormalExecutionOver(op_index)) { + // Normal execution under progress for this operator. + return; + } if (checkRebuildRequired(op_index)) { - if (checkNormalExecutionOver(op_index)) { - if (!checkRebuildInitiated(op_index)) { - if (initiateRebuild(op_index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(op_index); - } else { - // Rebuild under progress. - } - } else if (checkRebuildOver(op_index)) { - // Rebuild was under progress and now it is over. - markOperatorFinished(op_index); - } - } else { - // Normal execution under progress for this operator. + DCHECK(!checkRebuildInitiated(op_index)); + if (!initiateRebuild(op_index)) { + // Rebuild under progress. + return; } - } else if (checkOperatorExecutionOver(op_index)) { - // Rebuild not required for this operator and its normal execution is - // complete. - markOperatorFinished(op_index); + // Rebuild initiated and completed right away. } - for (const pair &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - // Process the dependent operator (of the operator whose WorkOrder - // was just executed) for which all the dependencies have been met. - processOperator(dependent_op_index, true); - } - } + markOperatorFinished(op_index); } void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index, const partition_id part_id) { query_exec_state_->decrementNumRebuildWorkOrders(op_index); - if (checkRebuildOver(op_index)) { - markOperatorFinished(op_index); - - for (const pair &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } -} - -void QueryManagerBase::processOperator(const dag_node_index index, - const bool recursively_check_dependents) { - if (fetchNormalWorkOrders(index)) { - // Fetched work orders. Return to wait for the generated work orders to - // execute, and skip the execution-finished checks. + if (!checkRebuildOver(op_index)) { return; } - if (checkNormalExecutionOver(index)) { - if (checkRebuildRequired(index)) { - if (!checkRebuildInitiated(index)) { - // Rebuild hasn't started, initiate it. - if (initiateRebuild(index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(index); - } else { - // Rebuild WorkOrders have been generated. - return; - } - } else if (checkRebuildOver(index)) { - // Rebuild had been initiated and it is over. - markOperatorFinished(index); - } - } else { - // Rebuild is not required and normal execution over, mark finished. - markOperatorFinished(index); - } - // If we reach here, that means the operator has been marked as finished. - if (recursively_check_dependents) { - for (const pair &dependent_link : - query_dag_->getDependents(index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } - } + markOperatorFinished(op_index); } void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index, @@ -214,23 +163,44 @@ void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index, query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id); // Because of the streamed input just fed, check if there are any new // WorkOrders available and if so, fetch them. - fetchNormalWorkOrders(consumer_index); + if (checkAllBlockingDependenciesMet(consumer_index)) { + fetchNormalWorkOrders(consumer_index); + } } } void QueryManagerBase::markOperatorFinished(const dag_node_index index) { query_exec_state_->setExecutionFinished(index); + for (const dag_node_index dependent_op_index : blocking_dependents_[index]) { + blocking_dependencies_[dependent_op_index].erase(index); + } + + for (const dag_node_index dependent_op_index : output_consumers_[index]) { + non_blocking_dependencies_[dependent_op_index].erase(index); + } + RelationalOperator *op = query_dag_->getNodePayloadMutable(index); op->updateCatalogOnCompletion(); const relation_id output_rel = op->getOutputRelationID(); + for (const pair &dependent_link : query_dag_->getDependents(index)) { const dag_node_index dependent_op_index = dependent_link.first; - RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index); - // Signal dependent operator that current operator is done feeding input blocks. if (output_rel >= 0) { - dependent_op->doneFeedingInputBlocks(output_rel); + // Signal dependent operator that current operator is done feeding input blocks. + query_dag_->getNodePayloadMutable(dependent_op_index)->doneFeedingInputBlocks(output_rel); + } + + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + // Process the dependent operator (of the operator whose WorkOrder + // was just executed) for which all the dependencies have been met. + if (!fetchNormalWorkOrders(dependent_op_index) && + non_blocking_dependencies_[dependent_op_index].empty() && + checkNormalExecutionOver(dependent_op_index) && + (!checkRebuildRequired(dependent_op_index) || initiateRebuild(dependent_op_index))) { + markOperatorFinished(dependent_op_index); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index 78d67cc..366ab61 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include "catalog/CatalogTypedefs.hpp" @@ -165,56 +166,20 @@ class QueryManagerBase { protected: /** - * @brief Process a current relational operator: Get its workorders and store - * them in the WorkOrdersContainer for this query. If the operator can - * be marked as done, do so. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * @param recursively_check_dependents If an operator is done, should we - * call processOperator on its dependents recursively. - **/ - void processOperator(const dag_node_index index, - const bool recursively_check_dependents); - - /** * @brief This function does the following things: * 1. Mark the given relational operator as "done". - * 2. For all the dependents of this operator, check if all of their - * blocking dependencies are met. If so inform them that the blocking - * dependencies are met. - * 3. Check if the given operator is done producing output. If it's - * done, inform the dependents that they won't receive input anymore - * from the given operator. + * 2. For all the dependents of this operator, check if the given + * operator is done producing output. If it's done, inform the + * dependents that they won't receive input anymore from the given + * operator. + * 3. Check if all of their blocking dependencies are met. If so + * fetch normal work orders. * * @param index The index of the given relational operator in the DAG. **/ void markOperatorFinished(const dag_node_index index); /** - * @brief Check if all the dependencies of the node at specified index have - * finished their execution. - * - * @note This function's true return value is a pre-requisite for calling - * getRebuildWorkOrders() - * - * @param node_index The index of the specified node in the query DAG. - * - * @return True if all the dependencies have finished their execution. False - * otherwise. - **/ - inline bool checkAllDependenciesMet(const dag_node_index node_index) const { - for (const dag_node_index dependency_index : - query_dag_->getDependencies(node_index)) { - // If at least one of the dependencies is not met, return false. - if (!query_exec_state_->hasExecutionFinished(dependency_index)) { - return false; - } - } - return true; - } - - /** * @brief Check if all the blocking dependencies of the node at specified * index have finished their execution. * @@ -229,27 +194,7 @@ class QueryManagerBase { **/ inline bool checkAllBlockingDependenciesMet( const dag_node_index node_index) const { - for (const dag_node_index blocking_dependency_index : - blocking_dependencies_[node_index]) { - if (!query_exec_state_->hasExecutionFinished( - blocking_dependency_index)) { - return false; - } - } - return true; - } - - /** - * @brief Check if the execution of the given operator is over. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the execution of the given operator is over, false - * otherwise. - **/ - inline bool checkOperatorExecutionOver(const dag_node_index index) const { - return this->checkNormalExecutionOver(index) && - (!checkRebuildRequired(index) || this->checkRebuildOver(index)); + return blocking_dependencies_[node_index].empty(); } /** @@ -295,7 +240,9 @@ class QueryManagerBase { std::vector> output_consumers_; // For all nodes, store their pipeline breaking dependencies (if any). - std::vector> blocking_dependencies_; + std::vector> blocking_dependencies_; + + std::vector non_dependent_operators_; std::unique_ptr query_exec_state_; @@ -338,6 +285,10 @@ class QueryManagerBase { **/ virtual bool checkRebuildOver(const dag_node_index index) const = 0; + // For all nodes, store their pipeline breaking dependents (if any). + std::vector> blocking_dependents_; + std::vector> non_blocking_dependencies_; + DISALLOW_COPY_AND_ASSIGN(QueryManagerBase); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index 1144e9f..30a1396 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -67,10 +67,11 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle, bus_(bus), normal_workorder_protos_container_( new WorkOrderProtosContainer(num_operators_in_dag_)) { - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - processOperator(index, false); + // Collect all the workorders from all the non-blocking relational operators in the DAG. + for (const dag_node_index index : non_dependent_operators_) { + if (!fetchNormalWorkOrders(index)) { + DCHECK(!checkRebuildRequired(index) || initiateRebuild(index)); + markOperatorFinished(index); } } @@ -177,35 +178,22 @@ serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessag } bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) { - bool generated_new_workorder_protos = false; - if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { - // Do not fetch any work units until all blocking dependencies are met. - // The releational operator is not aware of blocking dependencies for - // uncorrelated scalar queries. - if (!checkAllBlockingDependenciesMet(index)) { - return false; - } - const size_t num_pending_workorder_protos_before = - normal_workorder_protos_container_->getNumWorkOrderProtos(index); - const bool done_generation = - query_dag_->getNodePayloadMutable(index) - ->getAllWorkOrderProtos(normal_workorder_protos_container_.get()); - if (done_generation) { - query_exec_state_->setDoneGenerationWorkOrders(index); - } - - // TODO(shoban): It would be a good check to see if operator is making - // useful progress, i.e., the operator either generates work orders to - // execute or still has pending work orders executing. However, this will not - // work if Foreman polls operators without feeding data. This check can be - // enabled, if Foreman is refactored to call getAllWorkOrders() only when - // pending work orders are completed or new input blocks feed. - - generated_new_workorder_protos = - (num_pending_workorder_protos_before < - normal_workorder_protos_container_->getNumWorkOrderProtos(index)); + // Do not fetch any work units until all blocking dependencies are met. + // The releational operator is not aware of blocking dependencies for + // uncorrelated scalar queries. + DCHECK(checkAllBlockingDependenciesMet(index)); + DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index)); + + const size_t num_pending_workorder_protos_before = + normal_workorder_protos_container_->getNumWorkOrderProtos(index); + const bool done_generation = + query_dag_->getNodePayloadMutable(index) + ->getAllWorkOrderProtos(normal_workorder_protos_container_.get()); + if (done_generation) { + query_exec_state_->setDoneGenerationWorkOrders(index); } - return generated_new_workorder_protos; + + return (num_pending_workorder_protos_before < normal_workorder_protos_container_->getNumWorkOrderProtos(index)); } void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index, @@ -225,7 +213,7 @@ void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_no query_dag_->getDependents(op_index)) { const dag_node_index dependent_op_index = dependent_link.first; if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); + fetchNormalWorkOrders(dependent_op_index); } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index a021fdd..8d870c6 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -250,8 +250,7 @@ class QueryManagerDistributed final : public QueryManagerBase { private: bool checkNormalExecutionOver(const dag_node_index index) const override { - return (checkAllDependenciesMet(index) && - !normal_workorder_protos_container_->hasWorkOrderProto(index) && + return (!normal_workorder_protos_container_->hasWorkOrderProto(index) && query_exec_state_->getNumQueuedWorkOrders(index) == 0 && query_exec_state_->hasDoneGenerationWorkOrders(index)); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp index 82a0de6..2c9f673 100644 --- a/query_execution/QueryManagerSingleNode.cpp +++ b/query_execution/QueryManagerSingleNode.cpp @@ -61,10 +61,11 @@ QueryManagerSingleNode::QueryManagerSingleNode( workorders_container_( new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)), database_(static_cast(*catalog_database)) { - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - processOperator(index, false); + // Collect all the workorders from all the non-blocking relational operators in the DAG. + for (const dag_node_index index : non_dependent_operators_) { + if (!fetchNormalWorkOrders(index)) { + DCHECK(!checkRebuildRequired(index) || initiateRebuild(index)); + markOperatorFinished(index); } } } @@ -87,38 +88,25 @@ WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage( } bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) { - bool generated_new_workorders = false; - if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { - // Do not fetch any work units until all blocking dependencies are met. - // The releational operator is not aware of blocking dependencies for - // uncorrelated scalar queries. - if (!checkAllBlockingDependenciesMet(index)) { - return false; - } - const size_t num_pending_workorders_before = - workorders_container_->getNumNormalWorkOrders(index); - const bool done_generation = - query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), - query_context_.get(), - storage_manager_, - foreman_client_id_, - bus_); - if (done_generation) { - query_exec_state_->setDoneGenerationWorkOrders(index); - } - - // TODO(shoban): It would be a good check to see if operator is making - // useful progress, i.e., the operator either generates work orders to - // execute or still has pending work orders executing. However, this will not - // work if Foreman polls operators without feeding data. This check can be - // enabled, if Foreman is refactored to call getAllWorkOrders() only when - // pending work orders are completed or new input blocks feed. - - generated_new_workorders = - (num_pending_workorders_before < - workorders_container_->getNumNormalWorkOrders(index)); + // Do not fetch any work units until all blocking dependencies are met. + // The releational operator is not aware of blocking dependencies for + // uncorrelated scalar queries. + DCHECK(checkAllBlockingDependenciesMet(index)); + DCHECK(!query_exec_state_->hasDoneGenerationWorkOrders(index)); + + const size_t num_pending_workorders_before = + workorders_container_->getNumNormalWorkOrders(index); + const bool done_generation = + query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), + query_context_.get(), + storage_manager_, + foreman_client_id_, + bus_); + if (done_generation) { + query_exec_state_->setDoneGenerationWorkOrders(index); } - return generated_new_workorders; + + return (num_pending_workorders_before < workorders_container_->getNumNormalWorkOrders(index)); } bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/QueryManagerSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp index f9d038b..a726bbc 100644 --- a/query_execution/QueryManagerSingleNode.hpp +++ b/query_execution/QueryManagerSingleNode.hpp @@ -99,8 +99,7 @@ class QueryManagerSingleNode final : public QueryManagerBase { private: bool checkNormalExecutionOver(const dag_node_index index) const override { - return (checkAllDependenciesMet(index) && - !workorders_container_->hasNormalWorkOrder(index) && + return (!workorders_container_->hasNormalWorkOrder(index) && query_exec_state_->getNumQueuedWorkOrders(index) == 0 && query_exec_state_->hasDoneGenerationWorkOrders(index)); } @@ -108,8 +107,8 @@ class QueryManagerSingleNode final : public QueryManagerBase { bool initiateRebuild(const dag_node_index index) override; bool checkRebuildOver(const dag_node_index index) const override { - return query_exec_state_->hasRebuildInitiated(index) && - !workorders_container_->hasRebuildWorkOrder(index) && + DCHECK(query_exec_state_->hasRebuildInitiated(index)); + return !workorders_container_->hasRebuildWorkOrder(index) && (query_exec_state_->getNumRebuildWorkOrders(index) == 0); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/WorkOrdersContainer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp index e8d5ff8..3c2d9bf 100644 --- a/query_execution/WorkOrdersContainer.hpp +++ b/query_execution/WorkOrdersContainer.hpp @@ -542,6 +542,7 @@ class WorkOrdersContainer { DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer); }; + /** @} */ } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8d7284de/query_execution/tests/QueryManagerSingleNode_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp index 19b42ac..dd3f472 100644 --- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp +++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp @@ -353,14 +353,14 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { // This test creates a DAG of a single node. WorkOrders are generated // dynamically as pending work orders complete execution, i.e., // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be - // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to - // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will - // insert no WorkOrder and return true. + // called 3 times and 3 work orders will be returned, i.e., 2 calls to + // getAllWorkOrders() insert 2 WorkOrder and return false, and the last will + // insert 1 WorkOrder and return true. // TODO(shoban): This test can not be more robust than this because of fixed // scaffolding of mocking. If we use gMock, we can do much better. const QueryPlan::DAGNodeIndex id = - query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3)); + query_plan_->addRelationalOperator(new MockOperator(true, false, 3, 3)); const MockOperator &op = static_cast( query_plan_->getQueryPlanDAG().getNodePayload(id)); @@ -378,7 +378,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { unique_ptr worker_message; worker_message.reset(query_manager_->getNextWorkerMessage(id, -1)); - EXPECT_TRUE(worker_message != nullptr); + ASSERT_TRUE(worker_message != nullptr); EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, worker_message->getType()); EXPECT_EQ(id, worker_message->getRelationalOpIndex()); @@ -391,6 +391,7 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { if (i < 2) { // Send a message to QueryManager upon workorder completion. EXPECT_FALSE(placeWorkOrderCompleteMessage(id)); + query_manager_->fetchNormalWorkOrders(id); } else { // Send a message to QueryManager upon workorder completion. // Last event. @@ -511,7 +512,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); const QueryPlan::DAGNodeIndex id2 = - query_plan_->addRelationalOperator(new MockOperator(true, true, 3)); + query_plan_->addRelationalOperator(new MockOperator(true, true, 2)); // Create a non-blocking link. query_plan_->addDirectDependency(id2, id1, false); @@ -531,7 +532,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { EXPECT_EQ(1, op1.getNumWorkOrders()); EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); // op2 will generate workorder only after receiving a streaming input. EXPECT_EQ(0, op2.getNumWorkOrders()); EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); @@ -562,7 +563,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock)); // A call to op2's getAllWorkOrders because of the streamed input. - EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(1, op2.getNumWorkOrders()); // Place a message of a workorder completion of op1 on Foreman's input queue. @@ -573,7 +574,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); // An additional call to op2's getAllWorkOrders because of completion of op1. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(2, op2.getNumWorkOrders()); worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); @@ -620,7 +621,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); const QueryPlan::DAGNodeIndex id2 = - query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1)); + query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1)); // Create a non-blocking link. query_plan_->addDirectDependency(id2, id1, false); @@ -670,7 +671,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(1, op1.getNumWorkOrders()); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(0, op2.getNumWorkOrders()); unique_ptr worker_message; @@ -704,7 +705,7 @@ TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1)); // Based on the streamed input, op2's getAllWorkOrders should produce a // workorder. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(1, op2.getNumWorkOrders()); worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); @@ -734,16 +735,14 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) { // When an operator produces workorders but no output, the QueryManager should // check the dependents of this operator to make progress. const QueryPlan::DAGNodeIndex kNumNodes = 5; - std::vector ids; - ids.reserve(kNumNodes); for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { if (i == 0) { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false)); + query_plan_->addRelationalOperator(new MockOperator(true, false)); } else { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true)); + query_plan_->addRelationalOperator(new MockOperator(true, true)); } - VLOG(3) << ids[i]; + VLOG(3) << i; } /** @@ -753,46 +752,47 @@ TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) { * **/ for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) { - query_plan_->addDirectDependency(ids[i + 1], ids[i], false); - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i])) + query_plan_->addDirectDependency(i + 1, i, false); + static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(i)) ->setOutputRelationID(0xdead); } std::vector operators; for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - operators.push_back(static_cast(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i]))); + operators.push_back(static_cast(&query_plan_->getQueryPlanDAG().getNodePayload(i))); } constructQueryManager(); // operators[0] should have produced a workorder by now. + EXPECT_EQ(1, operators[0]->getNumCalls(MockOperator::kGetAllWorkOrders)); EXPECT_EQ(1, operators[0]->getNumWorkOrders()); unique_ptr worker_message; - worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1)); + worker_message.reset(query_manager_->getNextWorkerMessage(0, -1)); EXPECT_TRUE(worker_message != nullptr); EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, worker_message->getType()); - EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex()); + EXPECT_EQ(0, worker_message->getRelationalOpIndex()); delete worker_message->getWorkOrder(); - EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0])); - EXPECT_FALSE(getOperatorFinishedStatus(ids[0])); + EXPECT_EQ(1, getNumWorkOrdersInExecution(0)); + EXPECT_FALSE(getOperatorFinishedStatus(0)); - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders)); + for (QueryPlan::DAGNodeIndex i = 1; i < kNumNodes; ++i) { + EXPECT_EQ(0, operators[i]->getNumCalls(MockOperator::kGetAllWorkOrders)); } // Send a message to QueryManager upon workorder (generated by operators[0]) // completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0])); + EXPECT_TRUE(placeWorkOrderCompleteMessage(0)); for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i])); - EXPECT_TRUE(getOperatorFinishedStatus(ids[i])); + EXPECT_EQ(0, getNumWorkOrdersInExecution(i)); + EXPECT_TRUE(getOperatorFinishedStatus(i)); if (i < kNumNodes - 1) { EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks)); }