Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 20FFB200B25 for ; Wed, 8 Jun 2016 22:49:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1FE03160A2E; Wed, 8 Jun 2016 20:49:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9656E160A0E for ; Wed, 8 Jun 2016 22:49:08 +0200 (CEST) Received: (qmail 30690 invoked by uid 500); 8 Jun 2016 20:49:07 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 30674 invoked by uid 99); 8 Jun 2016 20:49:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 20:49:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4B3C5C0530 for ; Wed, 8 Jun 2016 20:49:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tJYEZIeP371z for ; Wed, 8 Jun 2016 20:49:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 2C5185FBA6 for ; Wed, 8 Jun 2016 20:49:01 +0000 (UTC) Received: (qmail 30062 invoked by uid 99); 8 Jun 2016 20:49:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 20:49:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17E19EAD98; Wed, 8 Jun 2016 20:49:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 08 Jun 2016 20:49:15 -0000 Message-Id: In-Reply-To: <3411ffb4be514168a0b6a92c9b107143@git.apache.org> References: <3411ffb4be514168a0b6a92c9b107143@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/16] incubator-quickstep git commit: Merge branch 'reorder-query-id-param' into query-manager-used-in-foreman archived-at: Wed, 08 Jun 2016 20:49:10 -0000 Merge branch 'reorder-query-id-param' into query-manager-used-in-foreman Conflicts: query_execution/Foreman.cpp query_execution/QueryManager.cpp query_execution/tests/Foreman_unittest.cpp relational_operators/RebuildWorkOrder.hpp relational_operators/UpdateOperator.cpp relational_operators/UpdateOperator.hpp Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e8ead861 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e8ead861 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e8ead861 Branch: refs/heads/query-manager-used-in-foreman Commit: e8ead86103341a34ac7449ed416d1dbba67496a7 Parents: bef0ae1 d67f61e Author: Harshad Deshmukh Authored: Wed Jun 8 15:46:28 2016 -0500 Committer: Harshad Deshmukh Committed: Wed Jun 8 15:46:28 2016 -0500 ---------------------------------------------------------------------- cli/QuickstepCli.cpp | 1 - query_execution/AdmitRequestMessage.hpp | 2 - query_execution/Foreman.cpp | 3 - query_execution/QueryExecutionUtil.hpp | 6 +- query_execution/QueryManager.cpp | 14 +- query_execution/WorkOrdersContainer.hpp | 10 +- query_execution/tests/Foreman_unittest.cpp | 945 +++++++++++++++++++ query_execution/tests/QueryManager_unittest.cpp | 2 +- .../tests/WorkOrdersContainer_unittest.cpp | 18 +- query_optimizer/ExecutionGenerator.cpp | 174 ++-- .../tests/ExecutionHeuristics_unittest.cpp | 25 +- relational_operators/AggregationOperator.cpp | 12 +- relational_operators/AggregationOperator.hpp | 14 +- relational_operators/BuildHashOperator.cpp | 4 +- relational_operators/BuildHashOperator.hpp | 21 +- relational_operators/CreateIndexOperator.hpp | 10 +- relational_operators/CreateTableOperator.hpp | 10 +- relational_operators/DeleteOperator.cpp | 8 +- relational_operators/DeleteOperator.hpp | 18 +- relational_operators/DestroyHashOperator.cpp | 5 +- relational_operators/DestroyHashOperator.hpp | 14 +- relational_operators/DropTableOperator.cpp | 3 +- relational_operators/DropTableOperator.hpp | 15 +- .../FinalizeAggregationOperator.cpp | 6 +- .../FinalizeAggregationOperator.hpp | 20 +- relational_operators/HashJoinOperator.cpp | 27 +- relational_operators/HashJoinOperator.hpp | 242 +++-- relational_operators/InsertOperator.cpp | 6 +- relational_operators/InsertOperator.hpp | 20 +- .../NestedLoopsJoinOperator.cpp | 41 +- .../NestedLoopsJoinOperator.hpp | 54 +- relational_operators/RebuildWorkOrder.hpp | 19 +- relational_operators/RelationalOperator.hpp | 8 +- relational_operators/SampleOperator.cpp | 46 +- relational_operators/SampleOperator.hpp | 31 +- relational_operators/SaveBlocksOperator.cpp | 1 + relational_operators/SaveBlocksOperator.hpp | 14 +- relational_operators/SelectOperator.cpp | 23 +- relational_operators/SelectOperator.hpp | 60 +- relational_operators/SortMergeRunOperator.cpp | 1 + relational_operators/SortMergeRunOperator.hpp | 29 +- .../SortRunGenerationOperator.cpp | 4 +- .../SortRunGenerationOperator.hpp | 28 +- relational_operators/TableGeneratorOperator.cpp | 7 +- relational_operators/TableGeneratorOperator.hpp | 23 +- relational_operators/TextScanOperator.cpp | 27 +- relational_operators/TextScanOperator.hpp | 34 +- relational_operators/UpdateOperator.cpp | 22 +- relational_operators/UpdateOperator.hpp | 42 +- relational_operators/WorkOrder.hpp | 16 +- relational_operators/WorkOrder.proto | 1 + relational_operators/WorkOrderFactory.cpp | 35 +- .../tests/AggregationOperator_unittest.cpp | 18 +- .../tests/HashJoinOperator_unittest.cpp | 156 +-- .../tests/SortMergeRunOperator_unittest.cpp | 11 +- .../SortRunGenerationOperator_unittest.cpp | 16 +- .../tests/TextScanOperator_unittest.cpp | 5 +- 57 files changed, 1825 insertions(+), 602 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --cc cli/QuickstepCli.cpp index 6f954fe,558d6eb..d65eb89 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@@ -402,25 -389,14 +402,24 @@@ int main(int argc, char* argv[]) } DCHECK(query_handle->getQueryPlanMutable() != nullptr); - foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable()); - - foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto()); - + // TODO(harshad) - In the future when queries are not admitted + // immediately, calculate their waiting time separately. - LOG(INFO) << "Address of query handle in QuickstepCli: " << query_handle.get(); + start = std::chrono::steady_clock::now(); + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id, + foreman.getBusClientID(), + query_handle.get(), + &bus); + if (send_status != tmb::MessageBus::SendStatus::kOK) { + fprintf(stderr, "\nQuery %s could not be admitted to the system\n", command_string->c_str()); + continue; + } try { - start = std::chrono::steady_clock::now(); - foreman.start(); - foreman.join(); + const AnnotatedMessage annotated_msg = + bus.Receive(main_thread_client_id, 0, true); + const TaggedMessage &tagged_message = annotated_msg.tagged_message; + DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); end = std::chrono::steady_clock::now(); const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/AdmitRequestMessage.hpp ---------------------------------------------------------------------- diff --cc query_execution/AdmitRequestMessage.hpp index e2a1077,0000000..e33b354 mode 100644,000000..100644 --- a/query_execution/AdmitRequestMessage.hpp +++ b/query_execution/AdmitRequestMessage.hpp @@@ -1,75 -1,0 +1,73 @@@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ + +#include + +#include "utility/Macros.hpp" + +namespace quickstep { + +class QueryHandle; + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A message requesting a query or queries to be admitted to the system. + **/ +class AdmitRequestMessage { + public: + /** + * @brief Constructor. + * + * @param query_handles The handles of the queries requesting to be admitted + * to the system. + **/ + explicit AdmitRequestMessage(const std::vector &query_handles) + : query_handles_(query_handles) {} + + /** + * @brief Constructor for requesting single query admission. + * + * @param query_handle The handle of the query requesting to be admitted. + **/ + explicit AdmitRequestMessage(QueryHandle *query_handle) { + query_handles_.push_back(query_handle); + } + + /** + * @brief Get the query handles from this message. + **/ + const std::vector& getQueryHandles() const { - LOG(INFO) << "Query handle in getQueryHandles(): " << query_handles_.front() - << " [0] " << query_handles_[0]; + return query_handles_; + } + + private: + std::vector query_handles_; + + DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --cc query_execution/Foreman.cpp index 6cec70a,7705819..3609120 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@@ -97,43 -58,161 +97,40 @@@ void Foreman::run() // We can pin the foreman thread to a CPU if specified. ThreadUtil::BindToCPU(cpu_id_); } - initializeState(); - - DEBUG_ASSERT(query_dag_ != nullptr); - const dag_node_index dag_size = query_dag_->size(); - - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < dag_size; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); - processOperator(index, false); - } - } - - // Dispatch the WorkOrders generated so far. - dispatchWorkerMessages(0, 0); -} - -void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index, - const size_t worker_thread_index) { - query_exec_state_->decrementNumQueuedWorkOrders(op_index); - - // As the given worker finished executing a WorkOrder, decrement its number - // of queued WorkOrders. - workers_->decrementNumQueuedWorkOrders(worker_thread_index); - - // Check if new work orders are available and fetch them if so. - fetchNormalWorkOrders(op_index); - - if (checkRebuildRequired(op_index)) { - if (checkNormalExecutionOver(op_index)) { - if (!checkRebuildInitiated(op_index)) { - if (initiateRebuild(op_index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(op_index); - } else { - // Rebuild under progress. - } - } else if (checkRebuildOver(op_index)) { - // Rebuild was under progress and now it is over. - markOperatorFinished(op_index); - } - } else { - // Normal execution under progress for this operator. - } - } else if (checkOperatorExecutionOver(op_index)) { - // Rebuild not required for this operator and its normal execution is - // complete. - markOperatorFinished(op_index); - } - - for (const pair &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - // Process the dependent operator (of the operator whose WorkOrder - // was just executed) for which all the dependencies have been met. - processOperator(dependent_op_index, true); - } - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - dispatchWorkerMessages(worker_thread_index, op_index); -} - -void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index, - const size_t worker_thread_index) { - query_exec_state_->decrementNumRebuildWorkOrders(op_index); - workers_->decrementNumQueuedWorkOrders(worker_thread_index); - - if (checkRebuildOver(op_index)) { - markOperatorFinished(op_index); - - for (const pair &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - dispatchWorkerMessages(worker_thread_index, op_index); -} - -void Foreman::processDataPipelineMessage(const dag_node_index op_index, - const block_id block, - const relation_id rel_id) { - for (const dag_node_index consumer_index : - output_consumers_[op_index]) { - // Feed the streamed block to the consumer. Note that 'output_consumers_' - // only contain those dependents of operator with index = op_index which are - // eligible to receive streamed input. - query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); - // Because of the streamed input just fed, check if there are any new - // WorkOrders available and if so, fetch them. - fetchNormalWorkOrders(consumer_index); - } - - // Dispatch the WorkerMessages to the workers. We prefer to start the search - // for the schedulable WorkOrders beginning from 'op_index'. The first - // candidate worker to receive the next WorkOrder is the one that sent the - // response message to Foreman. - // TODO(zuyu): Improve the data locality for the next WorkOrder. - dispatchWorkerMessages(0, op_index); -} - -void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) { - RelationalOperator *op = - query_dag_->getNodePayloadMutable(msg.header().rel_op_index); - op->receiveFeedbackMessage(msg); -} - -void Foreman::run() { - // Initialize before for Foreman eventloop. - initialize(); // Event loop - while (!query_exec_state_->hasQueryExecutionFinished()) { + for (;;) { // Receive() causes this thread to sleep until next message is received. - AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true); + const AnnotatedMessage annotated_msg = + bus_->Receive(foreman_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_msg.tagged_message; - switch (tagged_message.message_type()) { - case kWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index()); - break; - } - case kRebuildWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index()); + const tmb::message_type_id message_type = tagged_message.message_type(); + switch (message_type) { + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrdersAvailableMessage: { + policy_enforcer_->processMessage(tagged_message); break; } - case kCatalogRelationNewBlockMessage: { - serialization::CatalogRelationNewBlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - - CatalogRelation *relation = - static_cast(catalog_database_)->getRelationByIdMutable(proto.relation_id()); - relation->addBlock(block); - - if (proto.has_partition_id()) { - relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block); + case kAdmitRequestMessage: { + const AdmitRequestMessage *msg = + static_cast(tagged_message.message()); + const vector &query_handles = msg->getQueryHandles(); + - LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() << - " [0]: " << query_handles[0]; + DCHECK(!query_handles.empty()); + bool all_queries_admitted = true; + if (query_handles.size() == 1u) { - LOG(INFO) << "Address of query handle in foreman: " << query_handles.front(); + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles.front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. } break; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --cc query_execution/QueryExecutionUtil.hpp index 267bbe6,a8b6a38..50f277e --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@@ -65,67 -60,6 +65,65 @@@ class QueryExecutionUtil std::move(tagged_message)); } + /** + * @brief Construct and send an AdmitRequestMessage from a given sender to a + * given recipient. + * + * @param sender_id The TMB client ID of the sender. + * @param receiver_id The TMB client ID of the receiver. + * @param query_handle The QueryHandle used in the AdmitRequestMessage. + * @param bus A pointer to the TMB. + * @param tagged_message A moved from reference to the tagged message. + * + * @return A status code indicating the result of the message delivery. + * The caller should ensure that the status is SendStatus::kOK. + **/ + static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage( + const tmb::client_id sender_id, + const tmb::client_id receiver_id, + QueryHandle *query_handle, + MessageBus *bus) { - LOG(INFO) << "Address of query handle in QExecUtil: " << query_handle; - std::unique_ptr request_message(new AdmitRequestMessage(query_handle)); - const std::vector &query_handles = request_message->getQueryHandles(); - LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front() << " [0]: " << query_handles[0]; ++ std::unique_ptr request_message( ++ new AdmitRequestMessage(query_handle)); + const std::size_t size_of_request_msg = sizeof(*request_message); + TaggedMessage admit_tagged_message( + request_message.release(), size_of_request_msg, kAdmitRequestMessage); + + return QueryExecutionUtil::SendTMBMessage( + bus, sender_id, receiver_id, std::move(admit_tagged_message)); + } + + /** + * @brief Broadcast a poison message from a given sender. + * + * @note This message will be received by all the clients that have registered + * as recipients of the poison message type. + * + * @param sender_id The TMB client ID of the sender. + * @param bus A pointer to the TMB. + **/ + static void BroadcastPoisonMessage(const tmb::client_id sender_id, + tmb::MessageBus *bus) { + // Terminate all threads. + // The sender thread broadcasts poison message to the workers and foreman. + // Each worker dies after receiving poison message. The order of workers' + // death is irrelavant. + MessageStyle style; + style.Broadcast(true); + Address address; + address.All(true); + std::unique_ptr poison_message(WorkerMessage::PoisonMessage()); + TaggedMessage poison_tagged_message(poison_message.get(), + sizeof(*poison_message), + kPoisonMessage); + + const tmb::MessageBus::SendStatus send_status = bus->Send( + sender_id, address, style, std::move(poison_tagged_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << + "Broadcast poison message from sender with TMB client ID " << sender_id + << " failed"; + } + private: /** * @brief Constructor. Made private to avoid instantiation. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/WorkOrdersContainer.hpp ---------------------------------------------------------------------- diff --cc query_execution/WorkOrdersContainer.hpp index a1c4288,eb9aedd..3b93729 --- a/query_execution/WorkOrdersContainer.hpp +++ b/query_execution/WorkOrdersContainer.hpp @@@ -48,14 -46,10 +48,11 @@@ class WorkOrdersContainer * * @param num_operators Number of operators in the query DAG. * @param num_numa_nodes Number of NUMA nodes in the system. - * @param query_id The ID of the query. **/ WorkOrdersContainer(const std::size_t num_operators, - const std::size_t num_numa_nodes, - const std::size_t query_id) + const std::size_t num_numa_nodes) - : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) { + : num_operators_(num_operators), - num_numa_nodes_(num_numa_nodes), - query_id_(query_id) { ++ num_numa_nodes_(num_numa_nodes) { DEBUG_ASSERT(num_operators != 0); for (std::size_t op = 0; op < num_operators; ++op) { normal_workorders_.push_back( @@@ -226,9 -220,8 +223,8 @@@ * @param operator_index The index of the operator in the query DAG. **/ void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) { - DEBUG_ASSERT(workorder != nullptr); - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK(workorder != nullptr); + DCHECK(operator_index < num_operators_); - workorder->setQueryID(query_id_); normal_workorders_[operator_index].addWorkOrder(workorder); } @@@ -245,9 -238,8 +241,8 @@@ **/ void addRebuildWorkOrder(WorkOrder *workorder, const std::size_t operator_index) { - DEBUG_ASSERT(workorder != nullptr); - DEBUG_ASSERT(operator_index < num_operators_); + DCHECK(workorder != nullptr); + DCHECK(operator_index < num_operators_); - workorder->setQueryID(query_id_); rebuild_workorders_[operator_index].addWorkOrder(workorder); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/WorkOrdersContainer_unittest.cpp ---------------------------------------------------------------------- diff --cc query_execution/tests/WorkOrdersContainer_unittest.cpp index 865f01f,cf133c4..cb583ab --- a/query_execution/tests/WorkOrdersContainer_unittest.cpp +++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp @@@ -72,8 -72,7 +72,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN // they get inserted and retrieved correctly. std::vector numa_node_ids; // A container for one operator and no NUMA nodes. + const std::size_t query_id = 0; - WorkOrdersContainer w(1, 0, query_id); + WorkOrdersContainer w(1, 0); EXPECT_EQ(0u, w.getNumNormalWorkOrders(0)); EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0)); @@@ -128,8 -123,7 +128,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN // if they get inserted and retrieved correctly and the order of retrieval. // A container for one operator and no NUMA nodes. std::vector numa_node_ids; + const std::size_t query_id = 0; - WorkOrdersContainer w(1, 0, query_id); + WorkOrdersContainer w(1, 0); EXPECT_EQ(0u, w.getNumNormalWorkOrders(0)); EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0)); @@@ -198,8 -190,7 +198,8 @@@ TEST(WorkOrdersContainerTest, MultipleN const std::size_t kNUMANodesUsed = numa_node_ids.size(); // A container for one operator and kNUMANodes. + const std::size_t query_id = 0; - WorkOrdersContainer w(1, kNUMANodes, query_id); + WorkOrdersContainer w(1, kNUMANodes); for (std::size_t i = 0; i < kNUMANodesUsed; ++i) { std::vector curr_numa_node; @@@ -303,8 -291,7 +303,8 @@@ TEST(WorkOrdersContainerTest, AllTypesW const std::size_t kNUMANodesUsed = numa_nodes.size(); // Create the container. + const std::size_t query_id = 0; - WorkOrdersContainer w(1, kNUMANodes, query_id); + WorkOrdersContainer w(1, kNUMANodes); w.addNormalWorkOrder(&multiple_numa_work_order, 0); @@@ -443,8 -427,7 +443,8 @@@ TEST(WorkOrdersContainerTest, MultipleO const std::size_t kNUMANodes = numa_node_ids.size(); // Create the container. + const std::size_t query_id = 0; - WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id); + WorkOrdersContainer w(kNumOperators, kNUMANodes); std::vector operator_ids; for (std::size_t i = 0; i < kNumOperators; ++i) { @@@ -640,8 -620,7 +640,8 @@@ TEST(WorkOrdersContainerTest, MultipleO const std::size_t kNUMANodes = numa_node_ids.size(); // Create the container. + const std::size_t query_id = 0; - WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id); + WorkOrdersContainer w(kNumOperators, kNUMANodes); std::vector operator_ids; for (std::size_t i = 0; i < kNumOperators; ++i) { @@@ -796,8 -772,7 +796,8 @@@ TEST(WorkOrdersContainerTest, Retrieval numa_node_ids.push_back(0); const std::size_t kNumWorkOrdersPerType = 100; + const std::size_t query_id = 0; - WorkOrdersContainer w(1, 2, query_id); + WorkOrdersContainer w(1, 2); std::vector single_numa_node_workorder_ids; std::vector multiple_numa_node_workorder_ids; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.hpp ---------------------------------------------------------------------- diff --cc relational_operators/DeleteOperator.hpp index a239f42,c55f585..fdc9b00 --- a/relational_operators/DeleteOperator.hpp +++ b/relational_operators/DeleteOperator.hpp @@@ -159,8 -162,6 +162,7 @@@ class DeleteWorkOrder : public WorkOrde StorageManager *storage_manager_; const std::size_t delete_operator_index_; - const std::size_t query_id_; + const tmb::client_id scheduler_client_id_; MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/RebuildWorkOrder.hpp ---------------------------------------------------------------------- diff --cc relational_operators/RebuildWorkOrder.hpp index fef2cc9,86f8eaf..d11fe7d --- a/relational_operators/RebuildWorkOrder.hpp +++ b/relational_operators/RebuildWorkOrder.hpp @@@ -55,16 -56,17 +56,18 @@@ class RebuildWorkOrder : public WorkOrd * @param input_relation_id The ID of the CatalogRelation to which the given * storage block belongs to. * @param scheduler_client_id The TMB client ID of the scheduler thread. + * @param query_id The ID of the query. * @param bus A pointer to the TMB. **/ - RebuildWorkOrder(MutableBlockReference &&block_ref, - const std::size_t input_operator_index, - const relation_id input_relation_id, - const client_id scheduler_client_id, - const std::size_t query_id, - MessageBus *bus) - : block_ref_(std::move(block_ref)), + RebuildWorkOrder( + const std::size_t query_id, + MutableBlockReference &&block_ref, // NOLINT(whitespace/operators) + const std::size_t input_operator_index, + const relation_id input_relation_id, + const client_id scheduler_client_id, + MessageBus *bus) + : WorkOrder(query_id), + block_ref_(std::move(block_ref)), input_operator_index_(input_operator_index), input_relation_id_(input_relation_id), scheduler_client_id_(scheduler_client_id), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/SortMergeRunOperator.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --cc relational_operators/TextScanOperator.cpp index d1f1932,5acecbf..3899af4 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@@ -604,7 -609,6 +609,7 @@@ void TextSplitWorkOrder::execute() // Notify the operator about the completion of this Work Order. FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage, - getQueryID(), ++ query_id_, operator_index_, nullptr /* payload */, 0 /* payload_size */, @@@ -666,7 -670,6 +671,7 @@@ void TextSplitWorkOrder::sendBlobInfoTo const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue(); FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage, - getQueryID(), ++ query_id_, operator_index_, payload, payload_size); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --cc relational_operators/TextScanOperator.hpp index f87b530,3cda65b..4fd5c04 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@@ -372,8 -381,6 +381,7 @@@ class TextSplitWorkOrder : public WorkO StorageManager *storage_manager_; const std::size_t operator_index_; // Opeartor index. - const std::size_t query_id_; // query ID. + const tmb::client_id scheduler_client_id_; // The scheduler's TMB client ID. MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.hpp ---------------------------------------------------------------------- diff --cc relational_operators/UpdateOperator.hpp index 9673229,cebb9b5..b4f9b9d --- a/relational_operators/UpdateOperator.hpp +++ b/relational_operators/UpdateOperator.hpp @@@ -174,8 -181,6 +181,7 @@@ class UpdateWorkOrder : public WorkOrde StorageManager *storage_manager_; const std::size_t update_operator_index_; - const std::size_t query_id_; + const tmb::client_id scheduler_client_id_; MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/WorkOrder.hpp ---------------------------------------------------------------------- diff --cc relational_operators/WorkOrder.hpp index fd4b0f1,059865d..df195cc --- a/relational_operators/WorkOrder.hpp +++ b/relational_operators/WorkOrder.hpp @@@ -292,25 -285,16 +292,23 @@@ class WorkOrder " receiver thread with TMB client ID " << receiver_id; } + /** + * @brief Get the ID of the query which this WorkOder belongs to. + **/ + inline const std::size_t getQueryID() const { + return query_id_; + } + + protected: /** - * @brief Set the ID of the query which the WorkOrder belongs to. + * @brief Constructor. * - * @param query_id The query ID. + * @param query_id The ID of the query to which this WorkOrder belongs. **/ - void setQueryID(const std::size_t query_id) { - query_id_ = query_id; - } - - protected: - WorkOrder() {} + explicit WorkOrder(const std::size_t query_id) + : query_id_(query_id) {} + const std::size_t query_id_; // A vector of preferred NUMA node IDs where this workorder should be executed. // These node IDs typically indicate the NUMA node IDs of the input(s) of the // workorder. Derived classes should ensure that there are no duplicate entries http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/AggregationOperator_unittest.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/HashJoinOperator_unittest.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortMergeRunOperator_unittest.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortRunGenerationOperator_unittest.cpp ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/TextScanOperator_unittest.cpp ----------------------------------------------------------------------