Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0ED09200B4E for ; Sun, 10 Jul 2016 06:38:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0D15C160A74; Sun, 10 Jul 2016 04:38:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8CCB4160A67 for ; Sun, 10 Jul 2016 06:38:48 +0200 (CEST) Received: (qmail 39340 invoked by uid 500); 10 Jul 2016 04:38:47 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 39329 invoked by uid 99); 10 Jul 2016 04:38:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Jul 2016 04:38:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 375491800B5 for ; Sun, 10 Jul 2016 04:38:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id kdml-FTWAXJy for ; Sun, 10 Jul 2016 04:38:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 001415FE1C for ; Sun, 10 Jul 2016 04:38:41 +0000 (UTC) Received: (qmail 38535 invoked by uid 99); 10 Jul 2016 04:38:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Jul 2016 04:38:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1364EE0B49; Sun, 10 Jul 2016 04:38:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Sun, 10 Jul 2016 04:38:42 -0000 Message-Id: In-Reply-To: <5446ad831f0f4acc8b7748c15ebb0de4@git.apache.org> References: <5446ad831f0f4acc8b7748c15ebb0de4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-quickstep git commit: Refactored messages processing in both PolicyEnforcer and QueryManager. archived-at: Sun, 10 Jul 2016 04:38:50 -0000 Refactored messages processing in both PolicyEnforcer and QueryManager. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bbbc4cd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bbbc4cd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bbbc4cd1 Branch: refs/heads/refactor-process-msg Commit: bbbc4cd177e0c33ec02f2d39b0f56b501bbf0635 Parents: 09e9173 Author: Zuyu Zhang Authored: Sat Jul 9 14:54:05 2016 -0700 Committer: Zuyu Zhang Committed: Sat Jul 9 21:38:07 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 10 +-- query_execution/PolicyEnforcer.cpp | 61 ++++++++++--- query_execution/PolicyEnforcer.hpp | 6 +- query_execution/QueryManagerBase.cpp | 91 ++------------------ query_execution/QueryManagerBase.hpp | 90 ++++++++++--------- query_execution/QueryManagerSingleNode.cpp | 4 +- query_execution/QueryManagerSingleNode.hpp | 4 +- query_execution/tests/QueryManager_unittest.cpp | 79 +++-------------- 8 files changed, 122 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index c7ab9de..01549a5 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -91,8 +91,12 @@ target_link_libraries(quickstep_queryexecution_ForemanSingleNode ${GFLAGS_LIB_NAME}) target_link_libraries(quickstep_queryexecution_PolicyEnforcer glog + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs + quickstep_catalog_PartitionScheme quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryManagerBase quickstep_queryexecution_QueryManagerSingleNode @@ -100,6 +104,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer quickstep_queryexecution_WorkerMessage quickstep_queryoptimizer_QueryHandle quickstep_relationaloperators_WorkOrder + quickstep_storage_StorageBlockInfo quickstep_utility_Macros tmb ${GFLAGS_LIB_NAME}) @@ -152,14 +157,9 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil quickstep_utility_Macros tmb) target_link_libraries(quickstep_queryexecution_QueryManagerBase - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs - quickstep_catalog_PartitionScheme quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_QueryPlan quickstep_relationaloperators_RelationalOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/PolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp index f310ee1..4cba8c5 100644 --- a/query_execution/PolicyEnforcer.cpp +++ b/query_execution/PolicyEnforcer.cpp @@ -24,12 +24,19 @@ #include #include +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "catalog/PartitionScheme.hpp" #include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryManagerBase.hpp" #include "query_execution/QueryManagerSingleNode.hpp" #include "query_execution/WorkerDirectory.hpp" +#include "query_execution/WorkerMessage.hpp" #include "query_optimizer/QueryHandle.hpp" #include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" #include "gflags/gflags.h" #include "glog/logging.h" @@ -62,10 +69,9 @@ bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) { } void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { - // TODO(harshad) : Provide processXMessage() public functions in - // QueryManager, so that we need to extract message from the - // TaggedMessage only once. std::size_t query_id; + QueryManagerBase::dag_node_index op_index; + switch (tagged_message.message_type()) { case kWorkOrderCompleteMessage: { serialization::NormalWorkOrderCompletionMessage proto; @@ -73,12 +79,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { // WorkOrder. It can be accessed in this scope. CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - query_id = proto.query_id(); worker_directory_->decrementNumQueuedWorkOrders( proto.worker_thread_index()); if (profile_individual_workorders_) { recordTimeForWorkOrder(proto); } + + query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = proto.operator_index(); + admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index); break; } case kRebuildWorkOrderCompleteMessage: { @@ -87,23 +98,43 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { // rebuild WorkOrder. It can be accessed in this scope. CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - query_id = proto.query_id(); worker_directory_->decrementNumQueuedWorkOrders( proto.worker_thread_index()); + + query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = proto.operator_index(); + admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index); break; } case kCatalogRelationNewBlockMessage: { serialization::CatalogRelationNewBlockMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - query_id = proto.query_id(); - break; + + const block_id block = proto.block_id(); + + CatalogRelation *relation = + static_cast(catalog_database_)->getRelationByIdMutable(proto.relation_id()); + relation->addBlock(block); + + if (proto.has_partition_id()) { + relation->getPartitionSchemeMutable()->addBlockToPartition( + proto.partition_id(), block); + } + return; } case kDataPipelineMessage: { serialization::DataPipelineMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = proto.operator_index(); + admitted_queries_[query_id]->processDataPipelineMessage( + op_index, proto.block_id(), proto.relation_id()); break; } case kWorkOrdersAvailableMessage: { @@ -111,6 +142,12 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = proto.operator_index(); + + // Check if new work orders are available. + admitted_queries_[query_id]->fetchNormalWorkOrders(op_index); break; } case kWorkOrderFeedbackMessage: { @@ -118,15 +155,17 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { const_cast(tagged_message.message()), tagged_message.message_bytes()); query_id = msg.header().query_id; + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + op_index = msg.header().rel_op_index; + admitted_queries_[query_id]->processFeedbackMessage(op_index, msg); break; } default: LOG(FATAL) << "Unknown message type found in PolicyEnforcer"; } - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - const QueryManagerBase::QueryStatusCode return_code = - admitted_queries_[query_id]->processMessage(tagged_message); - if (return_code == QueryManagerBase::QueryStatusCode::kQueryExecuted) { + if (admitted_queries_[query_id]->queryStatus(op_index) == + QueryManagerBase::QueryStatusCode::kQueryExecuted) { removeQuery(query_id); if (!waiting_queries_.empty()) { // Admit the earliest waiting query. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/PolicyEnforcer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp index 79e61d1..4dd7f01 100644 --- a/query_execution/PolicyEnforcer.hpp +++ b/query_execution/PolicyEnforcer.hpp @@ -33,8 +33,8 @@ #include "glog/logging.h" #include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" + +namespace tmb { class MessageBus; } namespace quickstep { @@ -43,6 +43,8 @@ class QueryHandle; class StorageManager; class WorkerDirectory; +namespace serialization { class NormalWorkOrderCompletionMessage; } + /** * @brief A class that ensures that a high level policy is maintained * in sharing resources among concurrent queries. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index f7e183f..37beb02 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -21,13 +21,8 @@ #include #include -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" -#include "catalog/PartitionScheme.hpp" #include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "relational_operators/WorkOrder.hpp" @@ -39,10 +34,8 @@ using std::pair; namespace quickstep { -QueryManagerBase::QueryManagerBase(QueryHandle *query_handle, - CatalogDatabaseLite *catalog_database) +QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) : query_id_(DCHECK_NOTNULL(query_handle)->query_id()), - catalog_database_(DCHECK_NOTNULL(catalog_database)), query_dag_(DCHECK_NOTNULL( DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())), num_operators_in_dag_(query_dag_->size()), @@ -76,82 +69,8 @@ QueryManagerBase::QueryManagerBase(QueryHandle *query_handle, } } -QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage( - const TaggedMessage &tagged_message) { - dag_node_index op_index; - switch (tagged_message.message_type()) { - case kWorkOrderCompleteMessage: { - serialization::NormalWorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - op_index = proto.operator_index(); - processWorkOrderCompleteMessage(proto.operator_index()); - break; - } - case kRebuildWorkOrderCompleteMessage: { - serialization::RebuildWorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - op_index = proto.operator_index(); - processRebuildWorkOrderCompleteMessage(proto.operator_index()); - break; - } - case kCatalogRelationNewBlockMessage: { - serialization::CatalogRelationNewBlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - - CatalogRelation *relation = - static_cast(catalog_database_)->getRelationByIdMutable(proto.relation_id()); - relation->addBlock(block); - - if (proto.has_partition_id()) { - relation->getPartitionSchemeMutable()->addBlockToPartition( - proto.partition_id(), block); - } - return QueryStatusCode::kNone; - } - case kDataPipelineMessage: { - // Possible message senders include InsertDestinations and some - // operators which modify existing blocks. - serialization::DataPipelineMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - op_index = proto.operator_index(); - processDataPipelineMessage(proto.operator_index(), - proto.block_id(), - proto.relation_id()); - break; - } - case kWorkOrdersAvailableMessage: { - serialization::WorkOrdersAvailableMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - op_index = proto.operator_index(); - - // Check if new work orders are available. - fetchNormalWorkOrders(op_index); - break; - } - case kWorkOrderFeedbackMessage: { - WorkOrder::FeedbackMessage msg( - const_cast(tagged_message.message()), - tagged_message.message_bytes()); - - op_index = msg.header().rel_op_index; - processFeedbackMessage(msg); - break; - } - default: - LOG(FATAL) << "Unknown message type found in QueryManager"; - } - +QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus( + const dag_node_index op_index) { if (query_exec_state_->hasExecutionFinished(op_index)) { return QueryStatusCode::kOperatorExecuted; } @@ -165,9 +84,9 @@ QueryManagerBase::QueryStatusCode QueryManagerBase::processMessage( } void QueryManagerBase::processFeedbackMessage( - const WorkOrder::FeedbackMessage &msg) { + const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) { RelationalOperator *op = - query_dag_->getNodePayloadMutable(msg.header().rel_op_index); + query_dag_->getNodePayloadMutable(op_index); op->receiveFeedbackMessage(msg); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index e0298eb..f35928a 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -24,7 +24,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" @@ -33,7 +32,6 @@ namespace quickstep { -class CatalogDatabaseLite; class QueryHandle; /** \addtogroup QueryExecution @@ -50,7 +48,7 @@ class QueryManagerBase { typedef DAG::size_type_nodes dag_node_index; /** - * @brief Return codes for processMessage() function. + * @brief Return codes for queryStatus() function. * * @note When both operator and query get executed, kQueryExecuted takes * precedence over kOperatorExecuted. @@ -65,10 +63,8 @@ class QueryManagerBase { * @brief Constructor. * * @param query_handle The QueryHandle object for this query. - * @param catalog_database The CatalogDatabse used by the query. **/ - QueryManagerBase(QueryHandle *query_handle, - CatalogDatabaseLite *catalog_database); + explicit QueryManagerBase(QueryHandle *query_handle); /** * @brief Virtual destructor. @@ -76,26 +72,16 @@ class QueryManagerBase { virtual ~QueryManagerBase() {} /** - * @brief Process a message sent to the QueryManager. - * - * @param tagged_message TaggedMessage sent to the QueryManager. - * - * @return QueryStatusCode as determined after the message is processed. - **/ - QueryStatusCode processMessage(const TaggedMessage &tagged_message); - - /** * @brief Get the QueryExecutionState for this query. **/ inline const QueryExecutionState& getQueryExecutionState() const { return *query_exec_state_; } - protected: /** * @brief Process the received WorkOrder complete message. * - * @param node_index The index of the specified operator node in the query DAG + * @param op_index The index of the specified operator node in the query DAG * for the completed WorkOrder. **/ void processWorkOrderCompleteMessage(const dag_node_index op_index); @@ -103,28 +89,15 @@ class QueryManagerBase { /** * @brief Process the received RebuildWorkOrder complete message. * - * @param node_index The index of the specified operator node in the query DAG + * @param op_index The index of the specified operator node in the query DAG * for the completed RebuildWorkOrder. **/ void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index); /** - * @brief Process a current relational operator: Get its workorders and store - * them in the WorkOrdersContainer for this query. If the operator can - * be marked as done, do so. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * @param recursively_check_dependents If an operator is done, should we - * call processOperator on its dependents recursively. - **/ - void processOperator(const dag_node_index index, - const bool recursively_check_dependents); - - /** * @brief Process the received data pipeline message. * - * @param node_index The index of the specified operator node in the query DAG + * @param op_index The index of the specified operator node in the query DAG * for the pipelining block. * @param block The block id. * @param rel_id The ID of the relation that produced 'block'. @@ -134,12 +107,50 @@ class QueryManagerBase { const relation_id rel_id); /** + * @brief Fetch all work orders currently available in relational operator and + * store them internally. + * + * @param index The index of the relational operator to be processed in the + * query plan DAG. + * + * @return Whether any work order was generated by op. + **/ + virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0; + + /** * @brief Process the received work order feedback message and notify * relational operator. * + * @param op_index The index of the specified operator node in the query DAG + * for the feedback message. * @param message Feedback message from work order. **/ - void processFeedbackMessage(const WorkOrder::FeedbackMessage &message); + void processFeedbackMessage(const dag_node_index op_index, + const WorkOrder::FeedbackMessage &message); + + /** + * @brief Get the query status after processing an incoming message. + * + * @param op_index The index of the specified operator node in the query DAG + * for the incoming message. + * + * @return QueryStatusCode as determined after the message is processed. + **/ + QueryStatusCode queryStatus(const dag_node_index op_index); + + protected: + /** + * @brief Process a current relational operator: Get its workorders and store + * them in the WorkOrdersContainer for this query. If the operator can + * be marked as done, do so. + * + * @param index The index of the relational operator to be processed in the + * query plan DAG. + * @param recursively_check_dependents If an operator is done, should we + * call processOperator on its dependents recursively. + **/ + void processOperator(const dag_node_index index, + const bool recursively_check_dependents); /** * @brief This function does the following things: @@ -241,8 +252,6 @@ class QueryManagerBase { const std::size_t query_id_; - CatalogDatabaseLite *catalog_database_; - DAG *query_dag_; const dag_node_index num_operators_in_dag_; @@ -256,17 +265,6 @@ class QueryManagerBase { private: /** - * @brief Fetch all work orders currently available in relational operator and - * store them internally. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * - * @return Whether any work order was generated by op. - **/ - virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0; - - /** * @brief Check if the given operator's normal execution is over. * * @note The conditions for a given operator's normal execution to get over: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/QueryManagerSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp index 193b188..12f8ff5 100644 --- a/query_execution/QueryManagerSingleNode.cpp +++ b/query_execution/QueryManagerSingleNode.cpp @@ -46,12 +46,12 @@ QueryManagerSingleNode::QueryManagerSingleNode( CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, tmb::MessageBus *bus) - : QueryManagerBase(query_handle, catalog_database), + : QueryManagerBase(query_handle), foreman_client_id_(foreman_client_id), storage_manager_(DCHECK_NOTNULL(storage_manager)), bus_(DCHECK_NOTNULL(bus)), query_context_(new QueryContext(query_handle->getQueryContextProto(), - *catalog_database_, + *catalog_database, storage_manager_, foreman_client_id_, bus_)), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/QueryManagerSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp index 0750f13..e7130fb 100644 --- a/query_execution/QueryManagerSingleNode.hpp +++ b/query_execution/QueryManagerSingleNode.hpp @@ -68,6 +68,8 @@ class QueryManagerSingleNode final : public QueryManagerBase { ~QueryManagerSingleNode() override {} + bool fetchNormalWorkOrders(const dag_node_index index) override; + /** * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage. * @@ -91,8 +93,6 @@ class QueryManagerSingleNode final : public QueryManagerBase { } private: - bool fetchNormalWorkOrders(const dag_node_index index) override; - bool checkNormalExecutionOver(const dag_node_index index) const override { return (checkAllDependenciesMet(index) && !workorders_container_->hasNormalWorkOrder(index) && http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bbbc4cd1/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp index 52cee20..39ca58c 100644 --- a/query_execution/tests/QueryManager_unittest.cpp +++ b/query_execution/tests/QueryManager_unittest.cpp @@ -16,7 +16,6 @@ **/ #include -#include #include #include #include @@ -26,7 +25,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryContext.pb.h" -#include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionState.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryManagerSingleNode.hpp" @@ -49,7 +47,6 @@ #include "gtest/gtest.h" #include "tmb/id_typedefs.h" -#include "tmb/tagged_message.h" namespace tmb { class MessageBus; } @@ -254,89 +251,33 @@ class QueryManagerTest : public ::testing::Test { inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) { VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]"; - serialization::DataPipelineMessage proto; - proto.set_operator_index(source_operator_index); - - proto.set_block_id(0); // dummy block ID - proto.set_relation_id(0); // dummy relation ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const std::size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - tmb::TaggedMessage tagged_message(static_cast(proto_bytes), - proto_length, - kDataPipelineMessage); - std::free(proto_bytes); - query_manager_->processMessage(tagged_message); + + query_manager_->processDataPipelineMessage(source_operator_index, + 0 /* dummy block ID */, + 0 /* dummy relation ID */); return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); } inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]"; - TaggedMessage tagged_message; - serialization::NormalWorkOrderCompletionMessage proto; - proto.set_operator_index(index); - proto.set_worker_thread_index(1); // dummy worker ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast(proto_bytes), - proto_length, - kWorkOrderCompleteMessage); - std::free(proto_bytes); - query_manager_->processMessage(message); + query_manager_->processWorkOrderCompleteMessage(index); return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); } inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]"; - serialization::RebuildWorkOrderCompletionMessage proto; - proto.set_operator_index(index); - proto.set_worker_thread_index(1); // dummy worker thread ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast(proto_bytes), - proto_length, - kRebuildWorkOrderCompleteMessage); - - std::free(proto_bytes); - query_manager_->processMessage(message); + query_manager_->processRebuildWorkOrderCompleteMessage(index); return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); } inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) { VLOG(3) << "Place OutputBlock message for Op[" << index << "]"; - serialization::DataPipelineMessage proto; - proto.set_operator_index(index); - - proto.set_block_id(0); // dummy block ID - proto.set_relation_id(0); // dummy relation ID. - proto.set_query_id(0); // dummy query ID. - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const std::size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - tmb::TaggedMessage tagged_message(static_cast(proto_bytes), - proto_length, - kDataPipelineMessage); - std::free(proto_bytes); - query_manager_->processMessage(tagged_message); + + query_manager_->processDataPipelineMessage(index, + 0 /* dummy block ID */, + 0 /* dummy relation ID */); return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); }