quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject [16/16] incubator-quickstep git commit: Merge branch 'reorder-query-id-param' into query-manager-used-in-foreman
Date Wed, 08 Jun 2016 20:49:15 GMT
Merge branch 'reorder-query-id-param' into query-manager-used-in-foreman

Conflicts:
	query_execution/Foreman.cpp
	query_execution/QueryManager.cpp
	query_execution/tests/Foreman_unittest.cpp
	relational_operators/RebuildWorkOrder.hpp
	relational_operators/UpdateOperator.cpp
	relational_operators/UpdateOperator.hpp


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e8ead861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e8ead861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e8ead861

Branch: refs/heads/query-manager-used-in-foreman
Commit: e8ead86103341a34ac7449ed416d1dbba67496a7
Parents: bef0ae1 d67f61e
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Wed Jun 8 15:46:28 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Wed Jun 8 15:46:28 2016 -0500

----------------------------------------------------------------------
 cli/QuickstepCli.cpp                            |   1 -
 query_execution/AdmitRequestMessage.hpp         |   2 -
 query_execution/Foreman.cpp                     |   3 -
 query_execution/QueryExecutionUtil.hpp          |   6 +-
 query_execution/QueryManager.cpp                |  14 +-
 query_execution/WorkOrdersContainer.hpp         |  10 +-
 query_execution/tests/Foreman_unittest.cpp      | 945 +++++++++++++++++++
 query_execution/tests/QueryManager_unittest.cpp |   2 +-
 .../tests/WorkOrdersContainer_unittest.cpp      |  18 +-
 query_optimizer/ExecutionGenerator.cpp          | 174 ++--
 .../tests/ExecutionHeuristics_unittest.cpp      |  25 +-
 relational_operators/AggregationOperator.cpp    |  12 +-
 relational_operators/AggregationOperator.hpp    |  14 +-
 relational_operators/BuildHashOperator.cpp      |   4 +-
 relational_operators/BuildHashOperator.hpp      |  21 +-
 relational_operators/CreateIndexOperator.hpp    |  10 +-
 relational_operators/CreateTableOperator.hpp    |  10 +-
 relational_operators/DeleteOperator.cpp         |   8 +-
 relational_operators/DeleteOperator.hpp         |  18 +-
 relational_operators/DestroyHashOperator.cpp    |   5 +-
 relational_operators/DestroyHashOperator.hpp    |  14 +-
 relational_operators/DropTableOperator.cpp      |   3 +-
 relational_operators/DropTableOperator.hpp      |  15 +-
 .../FinalizeAggregationOperator.cpp             |   6 +-
 .../FinalizeAggregationOperator.hpp             |  20 +-
 relational_operators/HashJoinOperator.cpp       |  27 +-
 relational_operators/HashJoinOperator.hpp       | 242 +++--
 relational_operators/InsertOperator.cpp         |   6 +-
 relational_operators/InsertOperator.hpp         |  20 +-
 .../NestedLoopsJoinOperator.cpp                 |  41 +-
 .../NestedLoopsJoinOperator.hpp                 |  54 +-
 relational_operators/RebuildWorkOrder.hpp       |  19 +-
 relational_operators/RelationalOperator.hpp     |   8 +-
 relational_operators/SampleOperator.cpp         |  46 +-
 relational_operators/SampleOperator.hpp         |  31 +-
 relational_operators/SaveBlocksOperator.cpp     |   1 +
 relational_operators/SaveBlocksOperator.hpp     |  14 +-
 relational_operators/SelectOperator.cpp         |  23 +-
 relational_operators/SelectOperator.hpp         |  60 +-
 relational_operators/SortMergeRunOperator.cpp   |   1 +
 relational_operators/SortMergeRunOperator.hpp   |  29 +-
 .../SortRunGenerationOperator.cpp               |   4 +-
 .../SortRunGenerationOperator.hpp               |  28 +-
 relational_operators/TableGeneratorOperator.cpp |   7 +-
 relational_operators/TableGeneratorOperator.hpp |  23 +-
 relational_operators/TextScanOperator.cpp       |  27 +-
 relational_operators/TextScanOperator.hpp       |  34 +-
 relational_operators/UpdateOperator.cpp         |  22 +-
 relational_operators/UpdateOperator.hpp         |  42 +-
 relational_operators/WorkOrder.hpp              |  16 +-
 relational_operators/WorkOrder.proto            |   1 +
 relational_operators/WorkOrderFactory.cpp       |  35 +-
 .../tests/AggregationOperator_unittest.cpp      |  18 +-
 .../tests/HashJoinOperator_unittest.cpp         | 156 +--
 .../tests/SortMergeRunOperator_unittest.cpp     |  11 +-
 .../SortRunGenerationOperator_unittest.cpp      |  16 +-
 .../tests/TextScanOperator_unittest.cpp         |   5 +-
 57 files changed, 1825 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --cc cli/QuickstepCli.cpp
index 6f954fe,558d6eb..d65eb89
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@@ -402,25 -389,14 +402,24 @@@ int main(int argc, char* argv[]) 
          }
  
          DCHECK(query_handle->getQueryPlanMutable() != nullptr);
 -        foreman.setQueryPlan(query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable());
 -
 -        foreman.reconstructQueryContextFromProto(query_handle->getQueryContextProto());
 -
 +        // TODO(harshad) - In the future when queries are not admitted
 +        // immediately, calculate their waiting time separately.
-         LOG(INFO) << "Address of query handle in QuickstepCli: " << query_handle.get();
 +        start = std::chrono::steady_clock::now();
 +        const tmb::MessageBus::SendStatus send_status =
 +            QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
 +                main_thread_client_id,
 +                foreman.getBusClientID(),
 +                query_handle.get(),
 +                &bus);
 +        if (send_status != tmb::MessageBus::SendStatus::kOK) {
 +          fprintf(stderr, "\nQuery %s could not be admitted to the system\n", command_string->c_str());
 +          continue;
 +        }
          try {
 -          start = std::chrono::steady_clock::now();
 -          foreman.start();
 -          foreman.join();
 +          const AnnotatedMessage annotated_msg =
 +              bus.Receive(main_thread_client_id, 0, true);
 +          const TaggedMessage &tagged_message = annotated_msg.tagged_message;
 +          DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type());
            end = std::chrono::steady_clock::now();
  
            const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/AdmitRequestMessage.hpp
----------------------------------------------------------------------
diff --cc query_execution/AdmitRequestMessage.hpp
index e2a1077,0000000..e33b354
mode 100644,000000..100644
--- a/query_execution/AdmitRequestMessage.hpp
+++ b/query_execution/AdmitRequestMessage.hpp
@@@ -1,75 -1,0 +1,73 @@@
 +/**
 + *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
 + *     University of Wisconsin—Madison.
 + *
 + *   Licensed under the Apache License, Version 2.0 (the "License");
 + *   you may not use this file except in compliance with the License.
 + *   You may obtain a copy of the License at
 + *
 + *       http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *   Unless required by applicable law or agreed to in writing, software
 + *   distributed under the License is distributed on an "AS IS" BASIS,
 + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *   See the License for the specific language governing permissions and
 + *   limitations under the License.
 + **/
 +
 +#ifndef QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
 +#define QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_
 +
 +#include <vector>
 +
 +#include "utility/Macros.hpp"
 +
 +namespace quickstep {
 +
 +class QueryHandle;
 +
 +/** \addtogroup QueryExecution
 + *  @{
 + */
 +
 +/**
 + * @brief A message requesting a query or queries to be admitted to the system.
 + **/
 +class AdmitRequestMessage {
 + public:
 +  /**
 +   * @brief Constructor.
 +   *
 +   * @param query_handles The handles of the queries requesting to be admitted
 +   *        to the system.
 +   **/
 +  explicit AdmitRequestMessage(const std::vector<QueryHandle*> &query_handles)
 +      : query_handles_(query_handles) {}
 +
 +  /**
 +   * @brief Constructor for requesting single query admission.
 +   *
 +   * @param query_handle The handle of the query requesting to be admitted.
 +   **/
 +  explicit AdmitRequestMessage(QueryHandle *query_handle) {
 +    query_handles_.push_back(query_handle);
 +  }
 +
 +  /**
 +   * @brief Get the query handles from this message.
 +   **/
 +  const std::vector<QueryHandle*>& getQueryHandles() const {
-     LOG(INFO) << "Query handle in getQueryHandles(): " << query_handles_.front()
-       << " [0] " << query_handles_[0];
 +    return query_handles_;
 +  }
 +
 + private:
 +  std::vector<QueryHandle*> query_handles_;
 +
 +  DISALLOW_COPY_AND_ASSIGN(AdmitRequestMessage);
 +};
 +
 +/** @} */
 +
 +}  // namespace quickstep
 +
 +#endif  // QUICKSTEP_QUERY_EXECUTION_ADMIT_REQUEST_MESSAGE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --cc query_execution/Foreman.cpp
index 6cec70a,7705819..3609120
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@@ -97,43 -58,161 +97,40 @@@ void Foreman::run() 
      // We can pin the foreman thread to a CPU if specified.
      ThreadUtil::BindToCPU(cpu_id_);
    }
 -  initializeState();
 -
 -  DEBUG_ASSERT(query_dag_ != nullptr);
 -  const dag_node_index dag_size = query_dag_->size();
 -
 -  // Collect all the workorders from all the relational operators in the DAG.
 -  for (dag_node_index index = 0; index < dag_size; ++index) {
 -    if (checkAllBlockingDependenciesMet(index)) {
 -      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
 -      processOperator(index, false);
 -    }
 -  }
 -
 -  // Dispatch the WorkOrders generated so far.
 -  dispatchWorkerMessages(0, 0);
 -}
 -
 -void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
 -                                              const size_t worker_thread_index) {
 -  query_exec_state_->decrementNumQueuedWorkOrders(op_index);
 -
 -  // As the given worker finished executing a WorkOrder, decrement its number
 -  // of queued WorkOrders.
 -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
 -
 -  // Check if new work orders are available and fetch them if so.
 -  fetchNormalWorkOrders(op_index);
 -
 -  if (checkRebuildRequired(op_index)) {
 -    if (checkNormalExecutionOver(op_index)) {
 -      if (!checkRebuildInitiated(op_index)) {
 -        if (initiateRebuild(op_index)) {
 -          // Rebuild initiated and completed right away.
 -          markOperatorFinished(op_index);
 -        } else {
 -          // Rebuild under progress.
 -        }
 -      } else if (checkRebuildOver(op_index)) {
 -        // Rebuild was under progress and now it is over.
 -        markOperatorFinished(op_index);
 -      }
 -    } else {
 -      // Normal execution under progress for this operator.
 -    }
 -  } else if (checkOperatorExecutionOver(op_index)) {
 -    // Rebuild not required for this operator and its normal execution is
 -    // complete.
 -    markOperatorFinished(op_index);
 -  }
 -
 -  for (const pair<dag_node_index, bool> &dependent_link :
 -       query_dag_->getDependents(op_index)) {
 -    const dag_node_index dependent_op_index = dependent_link.first;
 -    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
 -      // Process the dependent operator (of the operator whose WorkOrder
 -      // was just executed) for which all the dependencies have been met.
 -      processOperator(dependent_op_index, true);
 -    }
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  dispatchWorkerMessages(worker_thread_index, op_index);
 -}
 -
 -void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
 -                                                     const size_t worker_thread_index) {
 -  query_exec_state_->decrementNumRebuildWorkOrders(op_index);
 -  workers_->decrementNumQueuedWorkOrders(worker_thread_index);
 -
 -  if (checkRebuildOver(op_index)) {
 -    markOperatorFinished(op_index);
 -
 -    for (const pair<dag_node_index, bool> &dependent_link :
 -         query_dag_->getDependents(op_index)) {
 -      const dag_node_index dependent_op_index = dependent_link.first;
 -      if (checkAllBlockingDependenciesMet(dependent_op_index)) {
 -        processOperator(dependent_op_index, true);
 -      }
 -    }
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  dispatchWorkerMessages(worker_thread_index, op_index);
 -}
 -
 -void Foreman::processDataPipelineMessage(const dag_node_index op_index,
 -                                         const block_id block,
 -                                         const relation_id rel_id) {
 -  for (const dag_node_index consumer_index :
 -       output_consumers_[op_index]) {
 -    // Feed the streamed block to the consumer. Note that 'output_consumers_'
 -    // only contain those dependents of operator with index = op_index which are
 -    // eligible to receive streamed input.
 -    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
 -    // Because of the streamed input just fed, check if there are any new
 -    // WorkOrders available and if so, fetch them.
 -    fetchNormalWorkOrders(consumer_index);
 -  }
 -
 -  // Dispatch the WorkerMessages to the workers. We prefer to start the search
 -  // for the schedulable WorkOrders beginning from 'op_index'. The first
 -  // candidate worker to receive the next WorkOrder is the one that sent the
 -  // response message to Foreman.
 -  // TODO(zuyu): Improve the data locality for the next WorkOrder.
 -  dispatchWorkerMessages(0, op_index);
 -}
 -
 -void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
 -  RelationalOperator *op =
 -      query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
 -  op->receiveFeedbackMessage(msg);
 -}
 -
 -void Foreman::run() {
 -  // Initialize before for Foreman eventloop.
 -  initialize();
  
    // Event loop
 -  while (!query_exec_state_->hasQueryExecutionFinished()) {
 +  for (;;) {
      // Receive() causes this thread to sleep until next message is received.
 -    AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
 +    const AnnotatedMessage annotated_msg =
 +        bus_->Receive(foreman_client_id_, 0, true);
      const TaggedMessage &tagged_message = annotated_msg.tagged_message;
 -    switch (tagged_message.message_type()) {
 -      case kWorkOrderCompleteMessage: {
 -        serialization::WorkOrderCompletionMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
 -        break;
 -      }
 -      case kRebuildWorkOrderCompleteMessage: {
 -        serialization::WorkOrderCompletionMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
 +    const tmb::message_type_id message_type = tagged_message.message_type();
 +    switch (message_type) {
 +      case kCatalogRelationNewBlockMessage:  // Fall through
 +      case kDataPipelineMessage:
 +      case kRebuildWorkOrderCompleteMessage:
 +      case kWorkOrderCompleteMessage:
 +      case kWorkOrderFeedbackMessage:
 +      case kWorkOrdersAvailableMessage: {
 +        policy_enforcer_->processMessage(tagged_message);
          break;
        }
 -      case kCatalogRelationNewBlockMessage: {
 -        serialization::CatalogRelationNewBlockMessage proto;
 -        CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
 -
 -        const block_id block = proto.block_id();
 -
 -        CatalogRelation *relation =
 -            static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
 -        relation->addBlock(block);
 -
 -        if (proto.has_partition_id()) {
 -          relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(),
block);
 +      case kAdmitRequestMessage: {
 +        const AdmitRequestMessage *msg =
 +            static_cast<const AdmitRequestMessage *>(tagged_message.message());
 +        const vector<QueryHandle *> &query_handles = msg->getQueryHandles();
 +
-         LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front()
<<
-             " [0]: " << query_handles[0];
 +        DCHECK(!query_handles.empty());
 +        bool all_queries_admitted = true;
 +        if (query_handles.size() == 1u) {
-           LOG(INFO) << "Address of query handle in foreman: " << query_handles.front();
 +          all_queries_admitted =
 +              policy_enforcer_->admitQuery(query_handles.front());
 +        } else {
 +          all_queries_admitted = policy_enforcer_->admitQueries(query_handles);
 +        }
 +        if (!all_queries_admitted) {
 +          LOG(WARNING) << "The scheduler could not admit all the queries";
 +          // TODO(harshad) - Inform the main thread about the failure.
          }
          break;
        }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/QueryExecutionUtil.hpp
----------------------------------------------------------------------
diff --cc query_execution/QueryExecutionUtil.hpp
index 267bbe6,a8b6a38..50f277e
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@@ -65,67 -60,6 +65,65 @@@ class QueryExecutionUtil 
                       std::move(tagged_message));
    }
  
 +  /**
 +   * @brief Construct and send an AdmitRequestMessage from a given sender to a
 +   *        given recipient.
 +   *
 +   * @param sender_id The TMB client ID of the sender.
 +   * @param receiver_id The TMB client ID of the receiver.
 +   * @param query_handle The QueryHandle used in the AdmitRequestMessage.
 +   * @param bus A pointer to the TMB.
 +   * @param tagged_message A moved from reference to the tagged message.
 +   *
 +   * @return A status code indicating the result of the message delivery.
 +   *         The caller should ensure that the status is SendStatus::kOK.
 +   **/
 +  static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage(
 +      const tmb::client_id sender_id,
 +      const tmb::client_id receiver_id,
 +      QueryHandle *query_handle,
 +      MessageBus *bus) {
-     LOG(INFO) << "Address of query handle in QExecUtil: " << query_handle;
-     std::unique_ptr<AdmitRequestMessage> request_message(new AdmitRequestMessage(query_handle));
-     const std::vector<QueryHandle *> &query_handles = request_message->getQueryHandles();
-       LOG(INFO) << "Address of query handle in foreman front: " << query_handles.front()
<< " [0]: " << query_handles[0];
++    std::unique_ptr<AdmitRequestMessage> request_message(
++        new AdmitRequestMessage(query_handle));
 +    const std::size_t size_of_request_msg = sizeof(*request_message);
 +    TaggedMessage admit_tagged_message(
 +        request_message.release(), size_of_request_msg, kAdmitRequestMessage);
 +
 +    return QueryExecutionUtil::SendTMBMessage(
 +        bus, sender_id, receiver_id, std::move(admit_tagged_message));
 +  }
 +
 +  /**
 +   * @brief Broadcast a poison message from a given sender.
 +   *
 +   * @note This message will be received by all the clients that have registered
 +   *       as recipients of the poison message type.
 +   *
 +   * @param sender_id The TMB client ID of the sender.
 +   * @param bus A pointer to the TMB.
 +   **/
 +  static void BroadcastPoisonMessage(const tmb::client_id sender_id,
 +                                     tmb::MessageBus *bus) {
 +    // Terminate all threads.
 +    // The sender thread broadcasts poison message to the workers and foreman.
 +    // Each worker dies after receiving poison message. The order of workers'
 +    // death is irrelavant.
 +    MessageStyle style;
 +    style.Broadcast(true);
 +    Address address;
 +    address.All(true);
 +    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
 +    TaggedMessage poison_tagged_message(poison_message.get(),
 +                                        sizeof(*poison_message),
 +                                        kPoisonMessage);
 +
 +    const tmb::MessageBus::SendStatus send_status = bus->Send(
 +        sender_id, address, style, std::move(poison_tagged_message));
 +    CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
 +       "Broadcast poison message from sender with TMB client ID " << sender_id
 +       << " failed";
 +  }
 +
   private:
    /**
     * @brief Constructor. Made private to avoid instantiation.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/WorkOrdersContainer.hpp
----------------------------------------------------------------------
diff --cc query_execution/WorkOrdersContainer.hpp
index a1c4288,eb9aedd..3b93729
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@@ -48,14 -46,10 +48,11 @@@ class WorkOrdersContainer 
     *
     * @param num_operators Number of operators in the query DAG.
     * @param num_numa_nodes Number of NUMA nodes in the system.
-    * @param query_id The ID of the query.
     **/
    WorkOrdersContainer(const std::size_t num_operators,
-                       const std::size_t num_numa_nodes,
-                       const std::size_t query_id)
+                       const std::size_t num_numa_nodes)
 -      : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
 +      : num_operators_(num_operators),
-         num_numa_nodes_(num_numa_nodes),
-         query_id_(query_id) {
++        num_numa_nodes_(num_numa_nodes) {
      DEBUG_ASSERT(num_operators != 0);
      for (std::size_t op = 0; op < num_operators; ++op) {
        normal_workorders_.push_back(
@@@ -226,9 -220,8 +223,8 @@@
     * @param operator_index The index of the operator in the query DAG.
     **/
    void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
 -    DEBUG_ASSERT(workorder != nullptr);
 -    DEBUG_ASSERT(operator_index < num_operators_);
 +    DCHECK(workorder != nullptr);
 +    DCHECK(operator_index < num_operators_);
-     workorder->setQueryID(query_id_);
      normal_workorders_[operator_index].addWorkOrder(workorder);
    }
  
@@@ -245,9 -238,8 +241,8 @@@
     **/
    void addRebuildWorkOrder(WorkOrder *workorder,
                             const std::size_t operator_index) {
 -    DEBUG_ASSERT(workorder != nullptr);
 -    DEBUG_ASSERT(operator_index < num_operators_);
 +    DCHECK(workorder != nullptr);
 +    DCHECK(operator_index < num_operators_);
-     workorder->setQueryID(query_id_);
      rebuild_workorders_[operator_index].addWorkOrder(workorder);
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_execution/tests/WorkOrdersContainer_unittest.cpp
----------------------------------------------------------------------
diff --cc query_execution/tests/WorkOrdersContainer_unittest.cpp
index 865f01f,cf133c4..cb583ab
--- a/query_execution/tests/WorkOrdersContainer_unittest.cpp
+++ b/query_execution/tests/WorkOrdersContainer_unittest.cpp
@@@ -72,8 -72,7 +72,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN
    // they get inserted and retrieved correctly.
    std::vector<int> numa_node_ids;
    // A container for one operator and no NUMA nodes.
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(1, 0, query_id);
+   WorkOrdersContainer w(1, 0);
  
    EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
    EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@@ -128,8 -123,7 +128,8 @@@ TEST(WorkOrdersContainerTest, ZeroNUMAN
    // if they get inserted and retrieved correctly and the order of retrieval.
    // A container for one operator and no NUMA nodes.
    std::vector<int> numa_node_ids;
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(1, 0, query_id);
+   WorkOrdersContainer w(1, 0);
  
    EXPECT_EQ(0u, w.getNumNormalWorkOrders(0));
    EXPECT_EQ(0u, w.getNumRebuildWorkOrders(0));
@@@ -198,8 -190,7 +198,8 @@@ TEST(WorkOrdersContainerTest, MultipleN
    const std::size_t kNUMANodesUsed = numa_node_ids.size();
  
    // A container for one operator and kNUMANodes.
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(1, kNUMANodes, query_id);
+   WorkOrdersContainer w(1, kNUMANodes);
  
    for (std::size_t i = 0; i < kNUMANodesUsed; ++i) {
      std::vector<int> curr_numa_node;
@@@ -303,8 -291,7 +303,8 @@@ TEST(WorkOrdersContainerTest, AllTypesW
    const std::size_t kNUMANodesUsed = numa_nodes.size();
  
    // Create the container.
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(1, kNUMANodes, query_id);
+   WorkOrdersContainer w(1, kNUMANodes);
  
    w.addNormalWorkOrder(&multiple_numa_work_order, 0);
  
@@@ -443,8 -427,7 +443,8 @@@ TEST(WorkOrdersContainerTest, MultipleO
    const std::size_t kNUMANodes = numa_node_ids.size();
  
    // Create the container.
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id);
+   WorkOrdersContainer w(kNumOperators, kNUMANodes);
  
    std::vector<std::size_t> operator_ids;
    for (std::size_t i = 0; i < kNumOperators; ++i) {
@@@ -640,8 -620,7 +640,8 @@@ TEST(WorkOrdersContainerTest, MultipleO
    const std::size_t kNUMANodes = numa_node_ids.size();
  
    // Create the container.
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(kNumOperators, kNUMANodes, query_id);
+   WorkOrdersContainer w(kNumOperators, kNUMANodes);
  
    std::vector<std::size_t> operator_ids;
    for (std::size_t i = 0; i < kNumOperators; ++i) {
@@@ -796,8 -772,7 +796,8 @@@ TEST(WorkOrdersContainerTest, Retrieval
    numa_node_ids.push_back(0);
    const std::size_t kNumWorkOrdersPerType = 100;
  
 +  const std::size_t query_id = 0;
-   WorkOrdersContainer w(1, 2, query_id);
+   WorkOrdersContainer w(1, 2);
  
    std::vector<int> single_numa_node_workorder_ids;
    std::vector<int> multiple_numa_node_workorder_ids;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/DeleteOperator.hpp
index a239f42,c55f585..fdc9b00
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@@ -159,8 -162,6 +162,7 @@@ class DeleteWorkOrder : public WorkOrde
    StorageManager *storage_manager_;
  
    const std::size_t delete_operator_index_;
-   const std::size_t query_id_;
 +
    const tmb::client_id scheduler_client_id_;
    MessageBus *bus_;
  

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --cc relational_operators/RebuildWorkOrder.hpp
index fef2cc9,86f8eaf..d11fe7d
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@@ -55,16 -56,17 +56,18 @@@ class RebuildWorkOrder : public WorkOrd
     * @param input_relation_id The ID of the CatalogRelation to which the given
     *        storage block belongs to.
     * @param scheduler_client_id The TMB client ID of the scheduler thread.
 +   * @param query_id The ID of the query.
     * @param bus A pointer to the TMB.
     **/
-   RebuildWorkOrder(MutableBlockReference &&block_ref,
-                    const std::size_t input_operator_index,
-                    const relation_id input_relation_id,
-                    const client_id scheduler_client_id,
-                    const std::size_t query_id,
-                    MessageBus *bus)
-       : block_ref_(std::move(block_ref)),
+   RebuildWorkOrder(
+       const std::size_t query_id,
+       MutableBlockReference &&block_ref,  // NOLINT(whitespace/operators)
+       const std::size_t input_operator_index,
+       const relation_id input_relation_id,
+       const client_id scheduler_client_id,
+       MessageBus *bus)
+       : WorkOrder(query_id),
+         block_ref_(std::move(block_ref)),
          input_operator_index_(input_operator_index),
          input_relation_id_(input_relation_id),
          scheduler_client_id_(scheduler_client_id),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --cc relational_operators/TextScanOperator.cpp
index d1f1932,5acecbf..3899af4
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@@ -604,7 -609,6 +609,7 @@@ void TextSplitWorkOrder::execute() 
  
    // Notify the operator about the completion of this Work Order.
    FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
-                       getQueryID(),
++                      query_id_,
                        operator_index_,
                        nullptr /* payload */,
                        0 /* payload_size */,
@@@ -666,7 -670,6 +671,7 @@@ void TextSplitWorkOrder::sendBlobInfoTo
  
    const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
    FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
-                                getQueryID(),
++                               query_id_,
                                 operator_index_,
                                 payload,
                                 payload_size);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/TextScanOperator.hpp
index f87b530,3cda65b..4fd5c04
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@@ -372,8 -381,6 +381,7 @@@ class TextSplitWorkOrder : public WorkO
    StorageManager *storage_manager_;
  
    const std::size_t operator_index_;  // Opeartor index.
-   const std::size_t query_id_;  // query ID.
 +
    const tmb::client_id scheduler_client_id_;  // The scheduler's TMB client ID.
    MessageBus *bus_;
  

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --cc relational_operators/UpdateOperator.hpp
index 9673229,cebb9b5..b4f9b9d
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@@ -174,8 -181,6 +181,7 @@@ class UpdateWorkOrder : public WorkOrde
    StorageManager *storage_manager_;
  
    const std::size_t update_operator_index_;
-   const std::size_t query_id_;
 +
    const tmb::client_id scheduler_client_id_;
    MessageBus *bus_;
  

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --cc relational_operators/WorkOrder.hpp
index fd4b0f1,059865d..df195cc
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@@ -292,25 -285,16 +292,23 @@@ class WorkOrder 
          " receiver thread with TMB client ID " << receiver_id;
    }
  
 +  /**
 +   * @brief Get the ID of the query which this WorkOder belongs to.
 +   **/
 +  inline const std::size_t getQueryID() const {
 +    return query_id_;
 +  }
 +
+  protected:
    /**
-    * @brief Set the ID of the query which the WorkOrder belongs to.
+    * @brief Constructor.
     *
-    * @param query_id The query ID.
+    * @param query_id The ID of the query to which this WorkOrder belongs.
     **/
-   void setQueryID(const std::size_t query_id) {
-     query_id_ = query_id;
-   }
- 
-  protected:
-   WorkOrder() {}
+   explicit WorkOrder(const std::size_t query_id)
+       : query_id_(query_id) {}
  
+   const std::size_t query_id_;
    // A vector of preferred NUMA node IDs where this workorder should be executed.
    // These node IDs typically indicate the NUMA node IDs of the input(s) of the
    // workorder. Derived classes should ensure that there are no duplicate entries

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e8ead861/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------


Mime
View raw message