Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40A56200B47 for ; Sun, 12 Jun 2016 02:48:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3DD28160A54; Sun, 12 Jun 2016 00:48:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1EE2F160A34 for ; Sun, 12 Jun 2016 02:48:14 +0200 (CEST) Received: (qmail 49993 invoked by uid 500); 12 Jun 2016 00:48:14 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 49983 invoked by uid 99); 12 Jun 2016 00:48:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Jun 2016 00:48:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A3E33C1714 for ; Sun, 12 Jun 2016 00:48:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1aHrz-0w43AV for ; Sun, 12 Jun 2016 00:48:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 33B6C5F5A4 for ; Sun, 12 Jun 2016 00:48:07 +0000 (UTC) Received: (qmail 49978 invoked by uid 99); 12 Jun 2016 00:48:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 12 Jun 2016 00:48:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58436E0252; Sun, 12 Jun 2016 00:48:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Sun, 12 Jun 2016 00:48:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] incubator-quickstep git commit: Long lived Foreman thread archived-at: Sun, 12 Jun 2016 00:48:17 -0000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/PolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp new file mode 100644 index 0000000..2145429 --- /dev/null +++ b/query_execution/PolicyEnforcer.cpp @@ -0,0 +1,183 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PolicyEnforcer.hpp" + +#include +#include +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryManager.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "relational_operators/WorkOrder.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +namespace quickstep { + +DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" + " can be allocated in a single round of dispatch of messages to" + " the workers."); + +bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) { + if (admitted_queries_.size() < kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle, + catalog_database_, storage_manager_, bus_)); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { + // TODO(harshad) : Provide processXMessage() public functions in + // QueryManager, so that we need to extract message from the + // TaggedMessage only once. + std::size_t query_id; + switch (tagged_message.message_type()) { + case kWorkOrderCompleteMessage: // Fall through. + case kRebuildWorkOrderCompleteMessage: { + serialization::WorkOrderCompletionMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kCatalogRelationNewBlockMessage: { + serialization::CatalogRelationNewBlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kDataPipelineMessage: { + serialization::DataPipelineMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrdersAvailableMessage: { + serialization::WorkOrdersAvailableMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrderFeedbackMessage: { + WorkOrder::FeedbackMessage msg(const_cast(tagged_message.message()), tagged_message.message_bytes()); + query_id = msg.header().query_id; + break; + } + default: + LOG(FATAL) << "Unknown message type found in PolicyEnforcer"; + } + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + const QueryManager::QueryStatusCode return_code = + admitted_queries_[query_id]->processMessage(tagged_message); + if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) { + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PolicyEnforcer::getWorkerMessages( + std::vector> *worker_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(worker_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + std::size_t per_query_share = 0; + if (!admitted_queries_.empty()) { + per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); + } else { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + DCHECK_GT(per_query_share, 0u); + std::vector finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManager *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + WorkerMessage *next_worker_message = + curr_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID); + if (next_worker_message != nullptr) { + ++messages_collected_curr_query; + worker_messages->push_back(std::unique_ptr(next_worker_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (const std::size_t finished_qid : finished_queries_ids) { + removeQuery(finished_qid); + } +} + +void PolicyEnforcer::removeQuery(const std::size_t query_id) { + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) { + LOG(WARNING) << "Removing query with ID " << query_id + << " that hasn't finished its execution"; + } + admitted_queries_.erase(query_id); +} + +bool PolicyEnforcer::admitQueries( + const std::vector &query_handles) { + for (QueryHandle *curr_query : query_handles) { + if (!admitQuery(curr_query)) { + return false; + } + } + return true; +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/PolicyEnforcer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp new file mode 100644 index 0000000..d4ba643 --- /dev/null +++ b/query_execution/PolicyEnforcer.hpp @@ -0,0 +1,168 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_ + +#include +#include +#include +#include +#include + +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryManager.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +namespace quickstep { + +class CatalogDatabaseLite; +class QueryHandle; +class StorageManager; + +/** + * @brief A class that ensures that a high level policy is maintained + * in sharing resources among concurrent queries. + **/ +class PolicyEnforcer { + public: + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the Foreman. + * @param num_numa_nodes Number of NUMA nodes used by the system. + * @param catalog_database The CatalogDatabase used. + * @param storage_manager The StorageManager used. + * @param bus The TMB. + **/ + PolicyEnforcer(const tmb::client_id foreman_client_id, + const std::size_t num_numa_nodes, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + tmb::MessageBus *bus) + : foreman_client_id_(foreman_client_id), + num_numa_nodes_(num_numa_nodes), + catalog_database_(catalog_database), + storage_manager_(storage_manager), + bus_(bus) {} + + /** + * @brief Destructor. + **/ + ~PolicyEnforcer() { + if (hasQueries()) { + LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or " + "waiting queries"; + } + } + + /** + * @brief Admit a query to the system. + * + * @param query_handle The QueryHandle for the new query. + * + * @return Whether the query was admitted to the system. + **/ + bool admitQuery(QueryHandle *query_handle); + + /** + * @brief Admit multiple queries in the system. + * + * @note In the current simple implementation, we only allow one active + * query in the system. Other queries will have to wait. + * + * @param query_handles A vector of QueryHandles for the queries to be + * admitted. + * + * @return True if all the queries were admitted, false if at least one query + * was not admitted. + **/ + bool admitQueries(const std::vector &query_handles); + + /** + * @brief Remove a given query that is under execution. + * + * @note This function is made public so that it is possible for a query + * to be killed. Otherwise, it should only be used privately by the + * class. + * + * TODO(harshad) - Extend this function to support removal of waiting queries. + * + * @param query_id The ID of the query to be removed. + **/ + void removeQuery(const std::size_t query_id); + + /** + * @brief Get worker messages to be dispatched. These worker messages come + * from the active queries. + * + * @param worker_messages The worker messages to be dispatched. + **/ + void getWorkerMessages( + std::vector> *worker_messages); + + /** + * @brief Process a message sent to the Foreman, which gets passed on to the + * policy enforcer. + * + * @param message The message. + **/ + void processMessage(const TaggedMessage &tagged_message); + + /** + * @brief Check if there are any queries to be executed. + * + * @return True if there is at least one active or waiting query, false if + * the policy enforcer doesn't have any query. + **/ + inline bool hasQueries() const { + return !(admitted_queries_.empty() && waiting_queries_.empty()); + } + + private: + static constexpr std::size_t kMaxConcurrentQueries = 1; + static constexpr std::size_t kMaxNumWorkerMessages = 20; + + const tmb::client_id foreman_client_id_; + const std::size_t num_numa_nodes_; + + CatalogDatabaseLite *catalog_database_; + StorageManager *storage_manager_; + + tmb::MessageBus *bus_; + + // Key = query ID, value = QueryManager* for the key query. + std::unordered_map> admitted_queries_; + + // The queries which haven't been admitted yet. + std::queue waiting_queries_; + + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp index 3bfce17..54dd557 100644 --- a/query_execution/QueryContext.cpp +++ b/query_execution/QueryContext.cpp @@ -89,13 +89,13 @@ QueryContext::QueryContext(const serialization::QueryContext &proto, for (int i = 0; i < proto.insert_destinations_size(); ++i) { const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i); - insert_destinations_.emplace_back( - InsertDestination::ReconstructFromProto(insert_destination_proto, - database.getRelationSchemaById( - insert_destination_proto.relation_id()), - storage_manager, - scheduler_client_id, - bus)); + insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto( + proto.query_id(), + insert_destination_proto, + database.getRelationSchemaById(insert_destination_proto.relation_id()), + storage_manager, + scheduler_client_id, + bus)); } for (int i = 0; i < proto.predicates_size(); ++i) { @@ -231,7 +231,7 @@ bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto, } } - return true; + return proto.IsInitialized(); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/QueryContext.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto index b37286c..98cd0b6 100644 --- a/query_execution/QueryContext.proto +++ b/query_execution/QueryContext.proto @@ -54,4 +54,6 @@ message QueryContext { // NOTE(zuyu): For UpdateWorkOrder only. repeated UpdateGroup update_groups = 10; + + required uint64 query_id = 11; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 15803cf..9d9a9e5 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -27,6 +27,7 @@ message EmptyMessage { message WorkOrderCompletionMessage { required uint64 operator_index = 1; required uint64 worker_thread_index = 2; + required uint64 query_id = 3; } message CatalogRelationNewBlockMessage { @@ -35,16 +36,19 @@ message CatalogRelationNewBlockMessage { // Used by PartitionAwareInsertDestination. optional uint64 partition_id = 3; + required uint64 query_id = 4; } message DataPipelineMessage { required uint64 operator_index = 1; required fixed64 block_id = 2; required int32 relation_id = 3; + required uint64 query_id = 4; } message WorkOrdersAvailableMessage { required uint64 operator_index = 1; + required uint64 query_id = 2; } // BlockLocator related messages. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index fc253bc..9d1060f 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -58,6 +58,8 @@ using ClientIDMap = ThreadIDBasedMap #include +#include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/WorkerMessage.hpp" #include "utility/Macros.hpp" #include "tmb/address.h" @@ -60,6 +63,55 @@ class QueryExecutionUtil { std::move(tagged_message)); } + /** + * @brief Construct and send an AdmitRequestMessage from a given sender to a + * given recipient. + * + * @param sender_id The TMB client ID of the sender. + * @param receiver_id The TMB client ID of the receiver. + * @param query_handle The QueryHandle used in the AdmitRequestMessage. + * @param bus A pointer to the TMB. + * @param tagged_message A moved from reference to the tagged message. + * + * @return A status code indicating the result of the message delivery. + * The caller should ensure that the status is SendStatus::kOK. + **/ + static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage( + const tmb::client_id sender_id, + const tmb::client_id receiver_id, + QueryHandle *query_handle, + MessageBus *bus) { + std::unique_ptr request_message( + new AdmitRequestMessage(query_handle)); + const std::size_t size_of_request_msg = sizeof(*request_message); + TaggedMessage admit_tagged_message( + request_message.release(), size_of_request_msg, kAdmitRequestMessage); + + return QueryExecutionUtil::SendTMBMessage( + bus, sender_id, receiver_id, std::move(admit_tagged_message)); + } + + static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) { + // Terminate all threads. + // The sender thread broadcasts poison message to the workers and foreman. + // Each worker dies after receiving poison message. The order of workers' + // death is irrelavant. + MessageStyle style; + style.Broadcast(true); + Address address; + address.All(true); + std::unique_ptr poison_message(WorkerMessage::PoisonMessage()); + TaggedMessage poison_tagged_message(poison_message.get(), + sizeof(*poison_message), + kPoisonMessage); + + const tmb::MessageBus::SendStatus send_status = bus->Send( + sender_id, address, style, std::move(poison_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << + "Broadcast poison message from sender with TMB client ID " << sender_id + << " failed"; + } + private: /** * @brief Constructor. Made private to avoid instantiation. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/QueryManager.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp index 47f54c5..b52460f 100644 --- a/query_execution/QueryManager.hpp +++ b/query_execution/QueryManager.hpp @@ -25,18 +25,21 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" #include "utility/DAG.hpp" #include "utility/Macros.hpp" +#include "tmb/id_typedefs.h" #include "tmb/message_bus.h" #include "tmb/tagged_message.h" namespace quickstep { class CatalogDatabaseLite; -class ForemanMessage; class QueryHandle; class StorageManager; class WorkerMessage; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/WorkOrdersContainer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp index eb9aedd..d023daa 100644 --- a/query_execution/WorkOrdersContainer.hpp +++ b/query_execution/WorkOrdersContainer.hpp @@ -28,6 +28,8 @@ #include "utility/Macros.hpp" #include "utility/PtrVector.hpp" +#include "glog/logging.h" + namespace quickstep { /** \addtogroup QueryExecution @@ -76,7 +78,7 @@ class WorkOrdersContainer { * @return If there are pending WorkOrders. **/ inline bool hasNormalWorkOrder(const std::size_t operator_index) const { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return normal_workorders_[operator_index].hasWorkOrder(); } @@ -92,9 +94,9 @@ class WorkOrdersContainer { **/ inline bool hasNormalWorkOrderForNUMANode( const std::size_t operator_index, const int numa_node_id) const { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return normal_workorders_[operator_index].hasWorkOrderForNUMANode( numa_node_id); } @@ -108,7 +110,7 @@ class WorkOrdersContainer { * @return If there are pending rebuild WorkOrders. **/ inline bool hasRebuildWorkOrder(const std::size_t operator_index) const { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return rebuild_workorders_[operator_index].hasWorkOrder(); } @@ -124,9 +126,9 @@ class WorkOrdersContainer { **/ inline bool hasRebuildWorkOrderForNUMANode( const std::size_t operator_index, const int numa_node_id) const { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode( numa_node_id); } @@ -144,9 +146,9 @@ class WorkOrdersContainer { **/ WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index, const int numa_node_id) { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return normal_workorders_[operator_index].getWorkOrderForNUMANode( numa_node_id); } @@ -164,7 +166,7 @@ class WorkOrdersContainer { **/ WorkOrder* getNormalWorkOrder(const std::size_t operator_index, const bool prefer_single_NUMA_node = true) { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return normal_workorders_[operator_index].getWorkOrder( prefer_single_NUMA_node); } @@ -182,9 +184,9 @@ class WorkOrdersContainer { **/ WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index, const int numa_node_id) { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return rebuild_workorders_[operator_index].getWorkOrderForNUMANode( numa_node_id); } @@ -202,7 +204,7 @@ class WorkOrdersContainer { **/ WorkOrder* getRebuildWorkOrder(const std::size_t operator_index, const bool prefer_single_NUMA_node = true) { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return rebuild_workorders_[operator_index].getWorkOrder( prefer_single_NUMA_node); } @@ -220,8 +222,8 @@ class WorkOrdersContainer { * @param operator_index The index of the operator in the query DAG. **/ void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) { - DEBUG_ASSERT(workorder != nullptr); - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK(workorder != nullptr); + DCHECK_LT(operator_index, num_operators_); normal_workorders_[operator_index].addWorkOrder(workorder); } @@ -238,8 +240,8 @@ class WorkOrdersContainer { **/ void addRebuildWorkOrder(WorkOrder *workorder, const std::size_t operator_index) { - DEBUG_ASSERT(workorder != nullptr); - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK(workorder != nullptr); + DCHECK_LT(operator_index, num_operators_); rebuild_workorders_[operator_index].addWorkOrder(workorder); } @@ -254,9 +256,9 @@ class WorkOrdersContainer { **/ inline std::size_t getNumNormalWorkOrdersForNUMANode( const std::size_t operator_index, const int numa_node_id) const { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode( numa_node_id); } @@ -271,7 +273,7 @@ class WorkOrdersContainer { **/ inline std::size_t getNumNormalWorkOrders( const std::size_t operator_index) const { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return normal_workorders_[operator_index].getNumWorkOrders(); } @@ -286,9 +288,9 @@ class WorkOrdersContainer { **/ inline std::size_t getNumRebuildWorkOrdersForNUMANode( const std::size_t operator_index, const int numa_node_id) const { - DEBUG_ASSERT(operator_index < num_operators_); - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_LT(operator_index, num_operators_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode( numa_node_id); } @@ -303,7 +305,7 @@ class WorkOrdersContainer { **/ inline std::size_t getNumRebuildWorkOrders( const std::size_t operator_index) const { - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK_LT(operator_index, num_operators_); return rebuild_workorders_[operator_index].getNumWorkOrders(); } @@ -418,8 +420,8 @@ class WorkOrdersContainer { void addWorkOrder(WorkOrder *workorder); bool hasWorkOrderForNUMANode(const int numa_node_id) const { - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return single_numa_node_workorders_[numa_node_id].hasWorkOrder() || multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode( numa_node_id); @@ -440,8 +442,8 @@ class WorkOrdersContainer { std::size_t getNumWorkOrdersForNUMANode( const int numa_node_id) const { - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() + multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode( numa_node_id); @@ -463,8 +465,8 @@ class WorkOrdersContainer { } WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) { - DEBUG_ASSERT(numa_node_id >= 0); - DEBUG_ASSERT(static_cast(numa_node_id) < num_numa_nodes_); + DCHECK_GE(numa_node_id, 0); + DCHECK_LT(static_cast(numa_node_id), num_numa_nodes_); WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder(); if (work_order == nullptr) { work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/Worker.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp index 645fd05..ef596e1 100644 --- a/query_execution/Worker.cpp +++ b/query_execution/Worker.cpp @@ -58,11 +58,14 @@ void Worker::run() { WorkerMessage message(*static_cast(tagged_message.message())); DCHECK(message.getWorkOrder() != nullptr); message.getWorkOrder()->execute(); + const std::size_t query_id_for_workorder = + message.getWorkOrder()->getQueryID(); delete message.getWorkOrder(); - sendWorkOrderCompleteMessage(annotated_msg.sender, - message.getRelationalOpIndex(), - tagged_message.message_type() == kRebuildWorkOrderMessage); + sendWorkOrderCompleteMessage( + annotated_msg.sender, message.getRelationalOpIndex(), + query_id_for_workorder, + tagged_message.message_type() == kRebuildWorkOrderMessage); break; } case kPoisonMessage: { @@ -76,10 +79,12 @@ void Worker::run() { void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver, const size_t op_index, + const size_t query_id, const bool is_rebuild_work_order) { serialization::WorkOrderCompletionMessage proto; proto.set_operator_index(op_index); proto.set_worker_thread_index(worker_thread_index_); + proto.set_query_id(query_id); // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/Worker.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp index b94e937..c0bafdc 100644 --- a/query_execution/Worker.hpp +++ b/query_execution/Worker.hpp @@ -97,11 +97,13 @@ class Worker : public Thread { * * @param receiver The id of the TMB client which should receive the response. * @param op_index The index of the operator to which the WorkOrder belongs. + * @param query_id The ID of the query which the WorkOrder belongs to. * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise * false. **/ void sendWorkOrderCompleteMessage(const tmb::client_id receiver, const std::size_t op_index, + const std::size_t query_id, const bool is_rebuild_work_order); const std::size_t worker_thread_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/WorkerMessage.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp index ec63af9..560c1ba 100644 --- a/query_execution/WorkerMessage.hpp +++ b/query_execution/WorkerMessage.hpp @@ -30,6 +30,8 @@ class WorkOrder; **/ class WorkerMessage { public: + static constexpr int kInvalidRecipientIndexHint = -1; + enum class WorkerMessageType { kRebuildWorkOrder = 0, kWorkOrder, @@ -105,6 +107,23 @@ class WorkerMessage { return type_; } + /** + * @brief Set a hint for the recipient worker thread. + * + * @param recipient_index_hint The hint i.e. the worker thread index. + **/ + inline void setRecipientHint(const int recipient_index_hint) { + recipient_index_hint_ = recipient_index_hint; + } + + /** + * @brief Get the hint for the recipient worker thread. The hint is invalid if + * it is kInvalidRecipientIndexHint. + **/ + inline int getRecipientHint() const { + return recipient_index_hint_; + } + private: /** * @brief Constructor. @@ -120,12 +139,13 @@ class WorkerMessage { const WorkerMessageType type) : work_unit_(work_unit), relational_op_index_(relational_op_index), - type_(type) { - } + type_(type), + recipient_index_hint_(kInvalidRecipientIndexHint) {} WorkOrder *work_unit_; const std::size_t relational_op_index_; const WorkerMessageType type_; + int recipient_index_hint_; }; } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/tests/Foreman_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/Foreman_unittest.cpp b/query_execution/tests/Foreman_unittest.cpp deleted file mode 100644 index cbe5088..0000000 --- a/query_execution/tests/Foreman_unittest.cpp +++ /dev/null @@ -1,952 +0,0 @@ -/** - * Copyright 2011-2015 Quickstep Technologies LLC. - * Copyright 2015-2016 Pivotal Software, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include -#include -#include -#include - -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/Foreman.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryContext.pb.h" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/WorkOrdersContainer.hpp" -#include "query_execution/WorkerDirectory.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "query_optimizer/QueryPlan.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/InsertDestination.pb.h" -#include "storage/StorageBlock.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageManager.hpp" -#include "utility/DAG.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "gtest/gtest.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::move; -using std::unique_ptr; -using std::vector; - -using tmb::client_id; - -namespace quickstep { - -class WorkOrderProtosContainer; - -class MockWorkOrder : public WorkOrder { - public: - explicit MockWorkOrder(const int op_index) - : WorkOrder(0), op_index_(op_index) {} - - void execute() override { - VLOG(3) << "WorkOrder[" << op_index_ << "] executing."; - } - - inline QueryPlan::DAGNodeIndex getOpIndex() const { - return op_index_; - } - - private: - const QueryPlan::DAGNodeIndex op_index_; - - DISALLOW_COPY_AND_ASSIGN(MockWorkOrder); -}; - -class MockOperator: public RelationalOperator { - public: - enum function_name { - kFeedInputBlock = 0, - kFeedInputBlocks, - kDoneFeedingInputBlocks, - kGetAllWorkOrders - }; - - MockOperator(const bool produce_workorders, - const bool has_streaming_input, - const int max_getworkorder_iters = 1, - const int max_workorders = INT_MAX) - : RelationalOperator(0 /* Query Id */), - produce_workorders_(produce_workorders), - has_streaming_input_(has_streaming_input), - max_workorders_(max_workorders), - max_getworkorder_iters_(max_getworkorder_iters), - num_calls_get_workorders_(0), - num_workorders_generated_(0), - num_calls_feedblock_(0), - num_calls_feedblocks_(0), - num_calls_donefeedingblocks_(0) { - } - -#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": " - - // The methods below are used to check whether Foreman calls the Relational - // operator, how many times it calls a particular method etc. - inline int getNumWorkOrders() const { - return num_workorders_generated_; - } - - inline int getNumCalls(const function_name fname) const { - switch (fname) { - case kFeedInputBlock: - return num_calls_feedblock_; - case kFeedInputBlocks: - return num_calls_feedblocks_; - case kDoneFeedingInputBlocks: - return num_calls_donefeedingblocks_; - case kGetAllWorkOrders: - return num_calls_get_workorders_; - default: - return -1; - } - } - - inline bool getBlockingDependenciesMet() const { - MOCK_OP_LOG(3) << "met."; - return blocking_dependencies_met_; - } - - void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) { - insert_destination_index_ = insert_destination_index; - } - - // Mock to trigger doneFeedingInputBlocks for the dependent operators - // in Foreman::markOperatorFinished. - void setOutputRelationID(const relation_id rel_id) { - output_relation_id_ = rel_id; - } - - // Override methods from the base class. - bool getAllWorkOrders( - WorkOrdersContainer *container, - QueryContext *query_context, - StorageManager *storage_manager, - const tmb::client_id foreman_client_id, - tmb::MessageBus *bus) override { - ++num_calls_get_workorders_; - if (produce_workorders_) { - if (has_streaming_input_) { - if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) { - MOCK_OP_LOG(3) << "[stream] generate WorkOrder"; - container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); - ++num_workorders_generated_; - } - } else { - if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) { - MOCK_OP_LOG(3) << "[static] generate WorkOrder"; - container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); - ++num_workorders_generated_; - } - } - } - MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") " - << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")"; - return num_calls_get_workorders_ == max_getworkorder_iters_; - } - - bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { - return true; - } - - void feedInputBlock(const block_id input_block_id, - const relation_id input_relation_id) override { - ++num_calls_feedblock_; - MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")"; - } - - void feedInputBlocks(const relation_id rel_id, - std::vector *partially_filled_blocks) override { - ++num_calls_feedblocks_; - MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")"; - } - - void doneFeedingInputBlocks(const relation_id rel_id) override { - ++num_calls_donefeedingblocks_; - MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")"; - } - - QueryContext::insert_destination_id getInsertDestinationID() const override { - return insert_destination_index_; - } - - const relation_id getOutputRelationID() const override { - return output_relation_id_; - } - - private: - const bool produce_workorders_; - const bool has_streaming_input_; - const int max_workorders_; - const int max_getworkorder_iters_; - - int num_calls_get_workorders_; - int num_workorders_generated_; - int num_calls_feedblock_; - int num_calls_feedblocks_; - int num_calls_donefeedingblocks_; - - QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId; - - relation_id output_relation_id_ = -1; - -#undef MOCK_OP_LOG - - DISALLOW_COPY_AND_ASSIGN(MockOperator); -}; - - -class ForemanTest : public ::testing::Test { - protected: - // Class ForemanTest is the friend of class Foreman. Each TEST_F behaves - // as a separate class, so we can't access Foreman's private members in - // TEST_F. - virtual void SetUp() { - db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); - storage_manager_.reset(new StorageManager("./")); - - query_plan_.reset(new QueryPlan()); - - bus_.Initialize(); - - foreman_.reset(new Foreman(&bus_, db_.get(), storage_manager_.get())); - - // This thread acts both as Foreman as well as Worker. Foreman connects to - // the bus in its constructor. - worker_client_id_ = bus_.Connect(); - - // Register as sender and receiver for relevant types of messages. - bus_.RegisterClientAsSender(worker_client_id_, kWorkOrderCompleteMessage); - bus_.RegisterClientAsSender(worker_client_id_, kRebuildWorkOrderCompleteMessage); - bus_.RegisterClientAsSender(worker_client_id_, kDataPipelineMessage); - bus_.RegisterClientAsReceiver(worker_client_id_, kWorkOrderMessage); - bus_.RegisterClientAsReceiver(worker_client_id_, kRebuildWorkOrderMessage); - bus_.RegisterClientAsReceiver(worker_client_id_, kPoisonMessage); - - std::vector worker_client_ids; - worker_client_ids.push_back(worker_client_id_); - - std::vector numa_nodes; - numa_nodes.push_back(static_cast(-1)); - - workers_.reset(new WorkerDirectory(1, worker_client_ids, numa_nodes)); - foreman_->setWorkerDirectory(workers_.get()); - } - - inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const { - return foreman_->query_exec_state_->getNumQueuedWorkOrders(index); - } - - inline const int getNumOperatorsFinished() const { - return foreman_->query_exec_state_->getNumOperatorsFinished(); - } - - inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const { - return foreman_->query_exec_state_->hasExecutionFinished(index); - } - - inline bool popWorkOrderIfAvailable(MockWorkOrder **workorder) { - AnnotatedMessage msg; - if (bus_.ReceiveIfAvailable(worker_client_id_, &msg)) { - WorkerMessage message(*static_cast(msg.tagged_message.message())); - *workorder = static_cast(message.getWorkOrder()); - return true; - } - return false; - } - - inline bool popRebuildWorkOrderIfAvailable(MockWorkOrder **workorder) { - return popWorkOrderIfAvailable(workorder); - } - - inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) { - VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]"; - foreman_->processDataPipelineMessage(source_operator_index, 0 /* block_id */, 0 /* relation_id */); - return foreman_->query_exec_state_->hasQueryExecutionFinished(); - } - - inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]"; - foreman_->processWorkOrderCompleteMessage(index, 0 /* worker id */); - return foreman_->query_exec_state_->hasQueryExecutionFinished(); - } - - inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]"; - foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */); - return foreman_->query_exec_state_->hasQueryExecutionFinished(); - } - - inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) { - VLOG(3) << "Place OutputBlock message for Op[" << index << "]"; - foreman_->processDataPipelineMessage(index, - BlockIdUtil::GetBlockId(1 /* domain */, 1), - 0 /* relation_id */); - return foreman_->query_exec_state_->hasQueryExecutionFinished(); - } - - inline bool startForeman() { - foreman_->initialize(); - return foreman_->query_exec_state_->hasQueryExecutionFinished(); - } - - inline int getWorkerInputQueueSize() { - return bus_.CountQueuedMessagesForClient(worker_client_id_); - } - - unique_ptr db_; - unique_ptr storage_manager_; - - unique_ptr query_plan_; - - unique_ptr foreman_; - MessageBusImpl bus_; - - client_id worker_client_id_; - - unique_ptr workers_; -}; - -TEST_F(ForemanTest, SingleNodeDAGNoWorkOrdersTest) { - // This test creates a DAG of a single node. No workorders are generated. - query_plan_->addRelationalOperator(new MockOperator(false, false)); - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - const MockOperator &op = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(0)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // Foreman exits after initialize, since no workorders are generated. - EXPECT_TRUE(startForeman()); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - // No workorder is generated. No response is received. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); -} - -TEST_F(ForemanTest, SingleNodeDAGStaticWorkOrdersTest) { - // This test creates a DAG of a single node. Static workorders are generated. - const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - const MockOperator &op = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); - EXPECT_FALSE(startForeman()); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); - - // One workorder is generated. - EXPECT_EQ(1, getWorkerInputQueueSize()); - EXPECT_EQ(1, op.getNumWorkOrders()); - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - EXPECT_EQ(id, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - // Send a message to Foreman upon workorder completion. - // Last event processed by Foreman. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(1, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id)); -} - -TEST_F(ForemanTest, SingleNodeDAGDynamicWorkOrdersTest) { - // This test creates a DAG of a single node. WorkOrders are generated - // dynamically as pending work orders complete execution, i.e., - // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be - // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to - // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will insert no - // WorkOrder and return true. - - // TODO(shoban): This test can not be more robust than this because of fixed - // scaffolding of mocking. If we use gMock, we can do much better. - const QueryPlan::DAGNodeIndex id = query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3)); - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - const MockOperator &op = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); - EXPECT_FALSE(startForeman()); - - // op doesn't have any dependencies. - EXPECT_TRUE(op.getBlockingDependenciesMet()); - - for (int i = 0; i < 3; i++) { - // We expect one call for op's getAllWorkOrders(). - EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); - - // One workorder is generated. - EXPECT_EQ(1, getWorkerInputQueueSize()); - EXPECT_EQ(i + 1, op.getNumWorkOrders()); - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - EXPECT_EQ(id, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - if (i < 2) { - // Send a message to Foreman upon workorder completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id)); - } else { - // Send a message to Foreman upon workorder completion. - // Last event. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); - } - } - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); - - EXPECT_EQ(1, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id)); - - // We place this check in the end, since it's true throughout the test. - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); -} - -TEST_F(ForemanTest, TwoNodesDAGBlockingLinkTest) { - // We use two nodes in the DAG with a blocking link between them. - // There is no streaming of data involved in this test. - const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false)); - const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, false)); - - // Create a blocking link. - query_plan_->addDirectDependency(id2, id1, true); - - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) - ->setOutputRelationID(0xdead); - - const MockOperator &op1 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - // Make sure queues are empty initially. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - EXPECT_FALSE(startForeman()); - - // op1 doesn't have any dependencies - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - - // Only op1 should receive a call to getAllWorkOrders initially. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - // Only op1 should produce a workorder. - EXPECT_EQ(1, getWorkerInputQueueSize()); - EXPECT_EQ(1, op1.getNumWorkOrders()); - EXPECT_EQ(0, op2.getNumWorkOrders()); - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // This workorder's source should be op1. - EXPECT_EQ(id1, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(0, getWorkerInputQueueSize()); - // Foreman hasn't yet got workorder completion response for the workorder. - EXPECT_EQ(1, getNumWorkOrdersInExecution(id1)); - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - EXPECT_EQ(0, getNumOperatorsFinished()); - - // Send a message to Foreman upon workorder (generated by op1) completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - // op1 is over now, op2 still to go. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_EQ(1, getNumOperatorsFinished()); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - - // op1 is op2's blocking dependency. - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - // op2 should get first call of getAllWorkOrders() when op1 is over. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - - EXPECT_EQ(1, op2.getNumWorkOrders()); - - // Send a message to Foreman upon workorder (generated by op2) completion. - - // Note that the worker hasn't yet popped the workorder. Usually this won't - // happen as workers pop workorders first, execute and then send the response. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - // WorkOrder yet to be popped by the worker. - EXPECT_EQ(1, getWorkerInputQueueSize()); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - - EXPECT_EQ(2, getNumOperatorsFinished()); - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); - - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have come from op2. - EXPECT_EQ(id2, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // Expect no additional calls to getAllWorkOrders. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); -} - -TEST_F(ForemanTest, TwoNodesDAGPipeLinkTest) { - // We use two nodes in the DAG with a non-blocking link between them. - // We stream output of op1 to op2. Sequeuce of events is as follows: - // 1. op1 creates a workorder. - // 2. We send a "block full" (from op1) to Foreman. - // 3. op2 creates a workorder because of step 2. - const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) - ->setOutputRelationID(0xdead); - - const MockOperator &op1 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - // Make sure queues are empty initially. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - startForeman(); - - // As none of the operators have a blocking link, blocking dependencies should - // be met. - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op1.getNumWorkOrders()); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - // op2 will generate workorder only after receiving a streaming input. - EXPECT_EQ(0, op2.getNumWorkOrders()); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - // There should be one workorder sent to the worker so far. - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // This workorder's source be op1. - EXPECT_EQ(id1, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - // Send a message to Foreman upon block getting full (output of op1). - EXPECT_FALSE(placeOutputBlockMessage(id1)); - - // op1 is not finished yet because the response of workorder completion hasn't - // been received yet by the Foreman. - EXPECT_FALSE(getOperatorFinishedStatus(id1)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - // No additional call to op1's getAllWorkOrders. - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); - - // Output from op1 should be fed to op2. - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock)); - EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); - - // A call to op2's getAllWorkOrders because of the streamed input. - EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumWorkOrders()); - - // Place a message of a workorder completion of op1 on Foreman's input queue. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - - // An additional call to op2's getAllWorkOrders because of completion of op1. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(2, op2.getNumWorkOrders()); - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Pop a workorder from Foreman's output queue. - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have been generated by op2. - EXPECT_EQ(id2, work_order->getOpIndex()); - work_order->execute(); - delete work_order; - - // Place a message of a workorder completion of op2 on Foreman's input queue. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Pop a workorder from Foreman's output queue. - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have been generated by op2. - EXPECT_EQ(id2, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - // Send a message to Foreman upon workorder (generated by op2) completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); -} - -TEST_F(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest) { - // In this test, we create a 2-node DAG with a non-blocking link between them. - // There is no streaming of data from op1 to op2 during the execution of op1. - // op1 produces a partially filled block at the end of its execution which is - // rebuilt and then fed to op2. - const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - // Create a relation, owned by db_. - CatalogRelation *relation = new CatalogRelation(nullptr /* catalog_database */, "test_relation"); - const relation_id output_relation_id = db_->addRelation(relation); - - // Setup the InsertDestination proto in the query context proto. - serialization::QueryContext query_context_proto; - - const QueryContext::insert_destination_id insert_destination_index = - query_context_proto.insert_destinations_size(); - serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations(); - - insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL); - insert_destination_proto->set_relation_id(output_relation_id); - insert_destination_proto->set_relational_op_index(id1); - - MockOperator *op1_mutable = - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)); - op1_mutable->setInsertDestinationID(insert_destination_index); - op1_mutable->setOutputRelationID(output_relation_id); - - const MockOperator &op1 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id1)); - const MockOperator &op2 = static_cast(query_plan_->getQueryPlanDAG().getNodePayload(id2)); - - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - foreman_->reconstructQueryContextFromProto(query_context_proto); - - // NOTE(zuyu): An operator generally has no ideas about partially filled - // blocks, but InsertDestination in QueryContext does. - // Mock to add partially filled blocks in the InsertDestination. - InsertDestination *insert_destination = - foreman_->query_context_->getInsertDestination(insert_destination_index); - DCHECK(insert_destination != nullptr); - MutableBlockReference block_ref; - static_cast(insert_destination)->available_block_refs_.push_back(move(block_ref)); - - // Make sure queues are empty initially. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - startForeman(); - - // There's no blocking dependency in the DAG. - EXPECT_TRUE(op1.getBlockingDependenciesMet()); - EXPECT_TRUE(op2.getBlockingDependenciesMet()); - - EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op1.getNumWorkOrders()); - - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(0, op2.getNumWorkOrders()); - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have been generated by op1. - EXPECT_EQ(id1, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // Send a message to Foreman upon workorder (generated by op1) completion. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); - - // op1 generates a rebuild workorder. The block is rebuilt and streamed - // to Foreman. - EXPECT_FALSE(placeDataPipelineMessage(id1)); - - // Based on the streamed input, op2's getAllWorkOrders should produce a - // workorder. - EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); - EXPECT_EQ(1, op2.getNumWorkOrders()); - - // Worker receives a rebuild WorkOrder. - MockWorkOrder *rebuild_op1; - ASSERT_TRUE(popRebuildWorkOrderIfAvailable(&rebuild_op1)); - // We skip the check for relation ID of the rebuild WorkOrder, as the partially - // filled block reference is a mock reference with garbage contents. - delete rebuild_op1; - - EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); - - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Worker receives a WorkOrder. - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have been generated by op2. - EXPECT_EQ(id2, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // Send a message to Foreman upon workorder (generated by op2) completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); - - EXPECT_TRUE(getOperatorFinishedStatus(id2)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); -} - -TEST_F(ForemanTest, MultipleNodesNoOutputTest) { - // When an operator produces workorders but no output, the Foreman should - // check the dependents of this operator to make progress. - const QueryPlan::DAGNodeIndex kNumNodes = 5; - std::vector ids; - ids.reserve(kNumNodes); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - if (i == 0) { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false)); - } else { - ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true)); - } - VLOG(3) << ids[i]; - } - - /** - * The DAG looks like this: - * - * op1 -> op2 -> op3 -> op4 -> op5 - * - **/ - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) { - query_plan_->addDirectDependency(ids[i + 1], ids[i], false); - static_cast(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i])) - ->setOutputRelationID(0xdead); - } - - std::vector operators; - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - operators.push_back(static_cast(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i]))); - } - - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - - // Make sure queues are empty initially. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - startForeman(); - - // operators[0] should have produced a workorder by now. - EXPECT_EQ(1, operators[0]->getNumWorkOrders()); - EXPECT_EQ(1, getWorkerInputQueueSize()); - - EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0])); - EXPECT_FALSE(getOperatorFinishedStatus(ids[0])); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders)); - } - - // Worker receives a WorkOrder. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // The workorder should have been generated by operators[0]. - EXPECT_EQ(ids[0], work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - // Send a message to Foreman upon workorder (generated by operators[0]) - // completion. - EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0])); - - for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { - EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i])); - EXPECT_TRUE(getOperatorFinishedStatus(ids[i])); - if (i < kNumNodes - 1) { - EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks)); - } - } -} - -TEST_F(ForemanTest, OutOfOrderWorkOrderCompletionTest) { - // Consider two operators, both generate one workorder each. The dependent's - // workorder finishes before dependency's workorder. - const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); - const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1)); - - // Create a non-blocking link. - query_plan_->addDirectDependency(id2, id1, false); - - foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); - // There should be two workorders on Worker's private queue, for this test. - foreman_->setMaxMessagesPerWorker(2); - - // Make sure queues are empty initially. - EXPECT_EQ(0, getWorkerInputQueueSize()); - - startForeman(); - - // Expect one workorder produced by op1. - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Pop a workorder from Foreman's output queue. - MockWorkOrder *work_order; - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // This workorder's source be op1. - EXPECT_EQ(id1, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - // Send a message to Foreman upon a block (output of op1) getting full. - EXPECT_FALSE(placeOutputBlockMessage(id1)); - - // op1 is not finished yet because the response of workorder completion hasn't - // been received yet. - EXPECT_FALSE(getOperatorFinishedStatus(id1)); - EXPECT_FALSE(getOperatorFinishedStatus(id2)); - - // Expect one workorder produced by op2. - EXPECT_EQ(1, getWorkerInputQueueSize()); - - // Worker receives a WorkOrder. - ASSERT_TRUE(popWorkOrderIfAvailable(&work_order)); - // This workorder's source should be op2. - EXPECT_EQ(id2, work_order->getOpIndex()); - - work_order->execute(); - delete work_order; - - // As mentioned earlier, op2 finishes before op1. - EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); - - // op1's workorder execution is over. - EXPECT_TRUE(placeWorkOrderCompleteMessage(id1)); - - EXPECT_TRUE(getOperatorFinishedStatus(id1)); - EXPECT_TRUE(getOperatorFinishedStatus(id2)); - - EXPECT_EQ(0, getWorkerInputQueueSize()); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp index 9ba5978..4f98748 100644 --- a/query_execution/tests/QueryManager_unittest.cpp +++ b/query_execution/tests/QueryManager_unittest.cpp @@ -228,8 +228,9 @@ class QueryManagerTest : public ::testing::Test { db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); storage_manager_.reset(new StorageManager("./")); bus_.Initialize(); - query_handle_.reset(new QueryHandle(0)); + query_handle_.reset(new QueryHandle(0)); // dummy query ID. query_plan_ = query_handle_->getQueryPlanMutable(); + query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id()); } inline void constructQueryManager() { @@ -256,6 +257,7 @@ class QueryManagerTest : public ::testing::Test { proto.set_block_id(0); // dummy block ID proto.set_relation_id(0); // dummy relation ID. + proto.set_query_id(0); // dummy query ID. // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); @@ -276,6 +278,7 @@ class QueryManagerTest : public ::testing::Test { serialization::WorkOrderCompletionMessage proto; proto.set_operator_index(index); proto.set_worker_thread_index(1); // dummy worker ID. + proto.set_query_id(0); // dummy query ID. // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const size_t proto_length = proto.ByteSize(); @@ -297,6 +300,7 @@ class QueryManagerTest : public ::testing::Test { serialization::WorkOrderCompletionMessage proto; proto.set_operator_index(index); proto.set_worker_thread_index(1); // dummy worker thread ID. + proto.set_query_id(0); // dummy query ID. // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const size_t proto_length = proto.ByteSize(); @@ -320,6 +324,7 @@ class QueryManagerTest : public ::testing::Test { proto.set_block_id(0); // dummy block ID proto.set_relation_id(0); // dummy relation ID. + proto.set_query_id(0); // dummy query ID. // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const std::size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_execution/tests/WorkOrdersContainer_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/WorkOrdersContainer_unittest.cpp b/query_execution/tests/WorkOrdersContainer_unittest.cpp index cf133c4..cb583ab 100644 --- a/query_execution/tests/WorkOrdersContainer_unittest.cpp +++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp @@ -72,6 +72,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) { // they get inserted and retrieved correctly. std::vector numa_node_ids; // A container for one operator and no NUMA nodes. + const std::size_t query_id = 0; WorkOrdersContainer w(1, 0); EXPECT_EQ(0u, w.getNumNormalWorkOrders(0)); @@ -104,11 +105,15 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesAddWorkOrderTest) { ASSERT_TRUE(returned_work_order != nullptr); EXPECT_EQ(work_order.getID(), static_cast(returned_work_order)->getID()); + EXPECT_EQ(query_id, returned_work_order->getQueryID()); + WorkOrder *returned_rebuild_work_order = w.getRebuildWorkOrder(0); ASSERT_TRUE(returned_rebuild_work_order != nullptr); EXPECT_EQ(work_order1.getID(), static_cast(returned_rebuild_work_order)->getID()); + EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID()); + // Container should be empty now. EXPECT_EQ(0u, w.getNumNormalWorkOrders(0)); EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0)); @@ -123,6 +128,7 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) { // if they get inserted and retrieved correctly and the order of retrieval. // A container for one operator and no NUMA nodes. std::vector numa_node_ids; + const std::size_t query_id = 0; WorkOrdersContainer w(1, 0); EXPECT_EQ(0u, w.getNumNormalWorkOrders(0)); @@ -164,6 +170,8 @@ TEST(WorkOrdersContainerTest, ZeroNUMANodesMultipleWorkOrdersTest) { ASSERT_TRUE(returned_work_order != nullptr); EXPECT_EQ(static_cast(kNumWorkOrders + i), static_cast(returned_rebuild_work_order)->getID()); + EXPECT_EQ(query_id, returned_work_order->getQueryID()); + EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID()); } // Container should be empty now. @@ -190,6 +198,7 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) { const std::size_t kNUMANodesUsed = numa_node_ids.size(); // A container for one operator and kNUMANodes. + const std::size_t query_id = 0; WorkOrdersContainer w(1, kNUMANodes); for (std::size_t i = 0; i < kNUMANodesUsed; ++i) { @@ -246,6 +255,9 @@ TEST(WorkOrdersContainerTest, MultipleNUMANodesTest) { ASSERT_TRUE(returned_rebuild_work_order != nullptr); EXPECT_EQ(rebuild_workorders[i].getID(), static_cast(returned_rebuild_work_order)->getID()); + + EXPECT_EQ(query_id, returned_work_order->getQueryID()); + EXPECT_EQ(query_id, returned_rebuild_work_order->getQueryID()); } // No workorder should be left for this operator on any NUMA node. @@ -291,6 +303,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) { const std::size_t kNUMANodesUsed = numa_nodes.size(); // Create the container. + const std::size_t query_id = 0; WorkOrdersContainer w(1, kNUMANodes); w.addNormalWorkOrder(&multiple_numa_work_order, 0); @@ -331,6 +344,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) { w.getNormalWorkOrderForNUMANode(0, numa_nodes[0])); ASSERT_TRUE(observed_work_order != nullptr); + EXPECT_EQ(query_id, observed_work_order->getQueryID()); EXPECT_EQ(one_numa_work_order.getPreferredNUMANodes().front(), observed_work_order->getPreferredNUMANodes().front()); EXPECT_EQ(one_numa_work_order.getID(), observed_work_order->getID()); @@ -348,6 +362,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) { EXPECT_EQ(no_numa_work_order.getID(), static_cast(observed_non_numa_work_order)->getID()); + EXPECT_EQ(query_id, observed_non_numa_work_order->getQueryID()); EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[0])); EXPECT_EQ(1u, w.getNumNormalWorkOrdersForNUMANode(0, numa_nodes[1])); @@ -361,6 +376,7 @@ TEST(WorkOrdersContainerTest, AllTypesWorkOrdersTest) { ASSERT_TRUE(observed_work_order_multiple_numa_nodes != nullptr); EXPECT_EQ(multiple_numa_work_order.getID(), observed_work_order_multiple_numa_nodes->getID()); + EXPECT_EQ(query_id, observed_work_order_multiple_numa_nodes->getQueryID()); std::vector observed_numa_nodes( observed_work_order_multiple_numa_nodes->getPreferredNUMANodes()); // Look up the expected numa nodes in the observed_numa_nodes vector. @@ -427,6 +443,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) { const std::size_t kNUMANodes = numa_node_ids.size(); // Create the container. + const std::size_t query_id = 0; WorkOrdersContainer w(kNumOperators, kNUMANodes); std::vector operator_ids; @@ -538,6 +555,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) { curr_operator_id, single_numa_node_id)); ASSERT_TRUE(observed_work_order_single_numa != nullptr); + EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_single_numa = normal_workorders_one_numa_ids[curr_operator_id]; @@ -550,6 +568,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) { curr_operator_id, multiple_numa_node_id)); ASSERT_TRUE(observed_work_order_multiple_numa != nullptr); + EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_multiple_numa = normal_workorders_multiple_numa_ids[curr_operator_id]; @@ -562,6 +581,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsNormalWorkOrderTest) { static_cast(w.getNormalWorkOrder(curr_operator_id)); ASSERT_TRUE(observed_work_order_no_numa != nullptr); + EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_no_numa = normal_workorders_no_numa_ids[curr_operator_id]; @@ -620,6 +640,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) { const std::size_t kNUMANodes = numa_node_ids.size(); // Create the container. + const std::size_t query_id = 0; WorkOrdersContainer w(kNumOperators, kNUMANodes); std::vector operator_ids; @@ -732,6 +753,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) { curr_operator_id, single_numa_node_id)); ASSERT_TRUE(observed_work_order_single_numa != nullptr); + EXPECT_EQ(query_id, observed_work_order_single_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_single_numa = rebuild_workorders_one_numa_ids[curr_operator_id]; @@ -744,6 +766,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) { curr_operator_id, multiple_numa_node_id)); ASSERT_TRUE(observed_work_order_multiple_numa != nullptr); + EXPECT_EQ(query_id, observed_work_order_multiple_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_multiple_numa = rebuild_workorders_multiple_numa_ids[curr_operator_id]; @@ -755,6 +778,7 @@ TEST(WorkOrdersContainerTest, MultipleOperatorsRebuildWorkOrderTest) { MockNUMAWorkOrder *observed_work_order_no_numa = static_cast(w.getRebuildWorkOrder(curr_operator_id)); + EXPECT_EQ(query_id, observed_work_order_no_numa->getQueryID()); // Verify if the workorder ID is correct. const int expected_workorder_id_no_numa = rebuild_workorders_no_numa_ids[curr_operator_id]; @@ -772,6 +796,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) { numa_node_ids.push_back(0); const std::size_t kNumWorkOrdersPerType = 100; + const std::size_t query_id = 0; WorkOrdersContainer w(1, 2); std::vector single_numa_node_workorder_ids; @@ -820,6 +845,7 @@ TEST(WorkOrdersContainerTest, RetrievalOrderTest) { MockNUMAWorkOrder *observed_work_order = static_cast( w.getNormalWorkOrder(0, prefer_single_NUMA_node)); ASSERT_TRUE(observed_work_order != nullptr); + EXPECT_EQ(query_id, observed_work_order->getQueryID()); if (prefer_single_NUMA_node) { EXPECT_EQ(*single_numa_it, observed_work_order->getID()); EXPECT_EQ(1u, observed_work_order->getPreferredNUMANodes().size()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index 0630bca..c7fd018 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -105,6 +105,7 @@ class ExecutionGenerator { execution_plan_(DCHECK_NOTNULL(query_handle->getQueryPlanMutable())), query_context_proto_(DCHECK_NOTNULL(query_handle->getQueryContextProtoMutable())), execution_heuristics_(new ExecutionHeuristics()) { + query_context_proto_->set_query_id(query_handle_->query_id()); #ifdef QUICKSTEP_DISTRIBUTED catalog_database_cache_proto_ = DCHECK_NOTNULL(query_handle->getCatalogDatabaseCacheProtoMutable()); #endif http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index 6ef2a03..5b58f75 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -116,9 +116,11 @@ target_link_libraries(quickstep_queryoptimizer_tests_ExecutionGeneratorTest quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper + quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_Foreman quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Worker quickstep_queryexecution_WorkerDirectory quickstep_queryexecution_WorkerMessage http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/422ad56b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp index 56b53ba..ea871d0 100644 --- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp @@ -24,7 +24,9 @@ #include "cli/DropRelation.hpp" #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" +#include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/Foreman.hpp" +#include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Worker.hpp" #include "query_optimizer/ExecutionGenerator.hpp" #include "query_optimizer/LogicalGenerator.hpp" @@ -40,6 +42,8 @@ #include "glog/logging.h" +#include "tmb/tagged_message.h" + namespace quickstep { class CatalogRelation; @@ -90,13 +94,18 @@ void ExecutionGeneratorTestRunner::runTestCase( physical_generator.generatePlan( logical_generator.generatePlan(*result.parsed_statement)); execution_generator.generatePlan(physical_plan); - foreman_->setQueryPlan( - query_handle.getQueryPlanMutable()->getQueryPlanDAGMutable()); - - foreman_->reconstructQueryContextFromProto(query_handle.getQueryContextProto()); - foreman_->start(); - foreman_->join(); + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id_, + foreman_->getBusClientID(), + &query_handle, + &bus_); + + // Receive workload completion message from Foreman. + const AnnotatedMessage annotated_msg = + bus_.Receive(main_thread_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_msg.tagged_message; + DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); if (query_result_relation) {