quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject incubator-quickstep git commit: Added partition_id in WorkOrder.
Date Sat, 26 Aug 2017 23:08:14 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 83c4fe557 -> 32b5b83f3


Added partition_id in WorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/32b5b83f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/32b5b83f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/32b5b83f

Branch: refs/heads/master
Commit: 32b5b83f3e37033a3959c935aca102f2db2822fc
Parents: 83c4fe5
Author: Zuyu Zhang <zuyu@cs.wisc.edu>
Authored: Thu Aug 24 13:26:04 2017 -0500
Committer: Zuyu Zhang <zuyu@cs.wisc.edu>
Committed: Sat Aug 26 16:54:29 2017 -0500

----------------------------------------------------------------------
 query_execution/PolicyEnforcerBase.cpp          |  4 +-
 query_execution/QueryExecutionMessages.proto    |  2 +
 query_execution/QueryManagerBase.cpp            |  6 +-
 query_execution/QueryManagerBase.hpp            |  8 ++-
 query_execution/Worker.cpp                      | 13 ++--
 .../tests/QueryManagerSingleNode_unittest.cpp   | 10 ++-
 relational_operators/AggregationOperator.cpp    |  2 +
 relational_operators/AggregationOperator.hpp    |  4 +-
 .../BuildAggregationExistenceMapOperator.cpp    |  2 +
 .../BuildAggregationExistenceMapOperator.hpp    |  4 +-
 relational_operators/BuildHashOperator.hpp      | 16 +----
 relational_operators/BuildLIPFilterOperator.cpp |  2 +
 relational_operators/BuildLIPFilterOperator.hpp |  4 +-
 relational_operators/CMakeLists.txt             |  1 +
 .../DestroyAggregationStateOperator.cpp         |  2 +-
 .../DestroyAggregationStateOperator.hpp         |  4 +-
 relational_operators/DestroyHashOperator.cpp    |  2 +-
 relational_operators/DestroyHashOperator.hpp    |  4 +-
 .../FinalizeAggregationOperator.cpp             |  1 -
 .../FinalizeAggregationOperator.hpp             |  5 +-
 relational_operators/HashJoinOperator.cpp       |  6 +-
 relational_operators/HashJoinOperator.hpp       | 66 +++-----------------
 .../InitializeAggregationOperator.cpp           |  1 +
 .../InitializeAggregationOperator.hpp           |  4 +-
 .../NestedLoopsJoinOperator.cpp                 |  4 ++
 .../NestedLoopsJoinOperator.hpp                 |  4 +-
 relational_operators/RebuildWorkOrder.hpp       |  6 +-
 relational_operators/SelectOperator.cpp         |  2 +-
 relational_operators/SelectOperator.hpp         |  7 +--
 relational_operators/WorkOrder.hpp              | 21 ++++++-
 relational_operators/WorkOrderFactory.cpp       | 15 ++++-
 31 files changed, 114 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 32f29a3..f97f711 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -72,7 +72,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
 
       op_index = proto.operator_index();
-      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index);
+      admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index, proto.partition_id());
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
@@ -87,7 +87,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
       DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
 
       op_index = proto.operator_index();
-      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index);
+      admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index, proto.partition_id());
       break;
     }
     case kCatalogRelationNewBlockMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 6aa8769..abc47cf 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -31,6 +31,7 @@ import "relational_operators/WorkOrder.proto";
 // order completion message, we may be interested in adding the compression
 // ratio or dictionary size of the rebuilt block.
 
+// Next tag: 9.
 message WorkOrderCompletionMessage {
   enum WorkOrderType {
     NORMAL = 0;
@@ -42,6 +43,7 @@ message WorkOrderCompletionMessage {
   required uint64 operator_index = 2;
   required uint64 worker_thread_index = 3;
   required uint64 query_id = 4;
+  required uint64 partition_id = 8;
 
   // Epoch time in microseconds.
   optional uint64 execution_start_time = 5;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f353b64..565c6ad 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -105,7 +105,8 @@ void QueryManagerBase::processFeedbackMessage(
 }
 
 void QueryManagerBase::processWorkOrderCompleteMessage(
-    const dag_node_index op_index) {
+    const dag_node_index op_index,
+    const partition_id part_id) {
   query_exec_state_->decrementNumQueuedWorkOrders(op_index);
 
   // Check if new work orders are available and fetch them if so.
@@ -144,7 +145,8 @@ void QueryManagerBase::processWorkOrderCompleteMessage(
   }
 }
 
-void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) {
+void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
+                                                              const partition_id part_id) {
   query_exec_state_->decrementNumRebuildWorkOrders(op_index);
 
   if (checkRebuildOver(op_index)) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index b6ed247..78d67cc 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -92,16 +92,20 @@ class QueryManagerBase {
    *
    * @param op_index The index of the specified operator node in the query DAG
    *        for the completed WorkOrder.
+   * @param part_id The partition id.
    **/
-  void processWorkOrderCompleteMessage(const dag_node_index op_index);
+  void processWorkOrderCompleteMessage(const dag_node_index op_index,
+                                       const partition_id part_id);
 
   /**
    * @brief Process the received RebuildWorkOrder complete message.
    *
    * @param op_index The index of the specified operator node in the query DAG
    *        for the completed RebuildWorkOrder.
+   * @param part_id The partition id.
    **/
-  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
+  void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
+                                              const partition_id part_id);
 
   /**
    * @brief Process the received data pipeline message.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index 1882f2e..44151af 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstdlib>
+#include <memory>
 #include <utility>
 
 #include "query_execution/QueryExecutionMessages.pb.h"
@@ -125,14 +126,17 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
   std::chrono::time_point<std::chrono::steady_clock> start, end;
   WorkerMessage worker_message(
       *static_cast<const WorkerMessage *>(tagged_message.message()));
-  DCHECK(worker_message.getWorkOrder() != nullptr);
-  const size_t query_id_for_workorder = worker_message.getWorkOrder()->getQueryID();
+  std::unique_ptr<WorkOrder> work_order(worker_message.getWorkOrder());
+  DCHECK(work_order);
+
+  const size_t query_id_for_workorder = work_order->getQueryID();
+  const partition_id part_id = work_order->getPartitionId();
 
   // Start measuring the execution time.
   start = std::chrono::steady_clock::now();
-  worker_message.getWorkOrder()->execute();
+  work_order->execute();
   end = std::chrono::steady_clock::now();
-  delete worker_message.getWorkOrder();
+  work_order.reset();
 
   // Convert the measured timestamps to epoch times in microseconds.
   const uint64_t execution_start_time =
@@ -147,6 +151,7 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message,
                                                    : WorkOrderCompletionMessage::NORMAL);
   proto->set_operator_index(worker_message.getRelationalOpIndex());
   proto->set_query_id(query_id_for_workorder);
+  proto->set_partition_id(part_id);
   proto->set_worker_thread_index(worker_thread_index_);
   proto->set_execution_start_time(execution_start_time);
   proto->set_execution_end_time(execution_end_time);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index c65364c..19b42ac 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -61,6 +61,12 @@ using tmb::client_id;
 
 namespace quickstep {
 
+namespace {
+
+const partition_id kPartitionId = 0;
+
+}  // namespace
+
 class WorkOrderProtosContainer;
 
 class MockWorkOrder : public WorkOrder {
@@ -256,14 +262,14 @@ class QueryManagerTest : public ::testing::Test {
   inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
 
-    query_manager_->processWorkOrderCompleteMessage(index);
+    query_manager_->processWorkOrderCompleteMessage(index, kPartitionId);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }
 
   inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";
 
-    query_manager_->processRebuildWorkOrderCompleteMessage(index);
+    query_manager_->processRebuildWorkOrderCompleteMessage(index, kPartitionId);
     return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index 2618e01..6833b59 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -51,6 +51,7 @@ bool AggregationOperator::getAllWorkOrders(
         container->addNormalWorkOrder(
             new AggregationWorkOrder(
                 query_id_,
+                part_id,
                 input_block_id,
                 query_context->getAggregationState(aggr_state_index_, part_id),
                 CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
@@ -65,6 +66,7 @@ bool AggregationOperator::getAllWorkOrders(
         container->addNormalWorkOrder(
             new AggregationWorkOrder(
                 query_id_,
+                part_id,
                 input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
                 query_context->getAggregationState(aggr_state_index_, part_id),
                 CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 1d37e50..2fb620b 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -155,15 +155,17 @@ class AggregationWorkOrder : public WorkOrder {
    * @brief Constructor
    *
    * @param query_id The ID of this query.
+   * @param part_id The partition id.
    * @param input_block_id The block id.
    * @param state The AggregationState to use.
    * @param lip_filter_adaptive_prober The attached LIP filter prober.
    **/
   AggregationWorkOrder(const std::size_t query_id,
+                       const partition_id part_id,
                        const block_id input_block_id,
                        AggregationOperationState *state,
                        LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_block_id_(input_block_id),
         state_(DCHECK_NOTNULL(state)),
         lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
index 5552b75..61cebe1 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.cpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -98,6 +98,7 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
             new BuildAggregationExistenceMapWorkOrder(
                 query_id_,
                 input_relation_,
+                part_id,
                 input_block_id,
                 build_attribute_,
                 query_context->getAggregationState(aggr_state_index_, part_id),
@@ -114,6 +115,7 @@ bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
             new BuildAggregationExistenceMapWorkOrder(
                   query_id_,
                   input_relation_,
+                  part_id,
                   input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
                   build_attribute_,
                   query_context->getAggregationState(aggr_state_index_, part_id),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index b29ad4a..8791a38 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -158,6 +158,7 @@ class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of this query.
    * @param input_relation The relation to build the existence map on.
+   * @param part_id The partition id of 'input_relation'.
    * @param build_block_id The block id.
    * @param build_attribute The ID of the attribute to build on.
    * @param state The AggregationState to use.
@@ -165,11 +166,12 @@ class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
    **/
   BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
                                         const CatalogRelationSchema &input_relation,
+                                        const partition_id part_id,
                                         const block_id build_block_id,
                                         const attribute_id build_attribute,
                                         AggregationOperationState *state,
                                         StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
         build_block_id_(build_block_id),
         build_attribute_(build_attribute),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 50dbc05..d18d9fb 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -203,11 +203,10 @@ class BuildHashWorkOrder : public WorkOrder {
                      JoinHashTable *hash_table,
                      StorageManager *storage_manager,
                      LIPFilterBuilder *lip_filter_builder)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
@@ -236,11 +235,10 @@ class BuildHashWorkOrder : public WorkOrder {
                      JoinHashTable *hash_table,
                      StorageManager *storage_manager,
                      LIPFilterBuilder *lip_filter_builder)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         build_block_id_(build_block_id),
         hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
@@ -254,20 +252,10 @@ class BuildHashWorkOrder : public WorkOrder {
 
   void execute() override;
 
-  /**
-   * @brief Get the partition id.
-   *
-   * @return The partition id.
-   */
-  partition_id getPartitionId() const {
-    return part_id_;
-  }
-
  private:
   const CatalogRelationSchema &input_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const partition_id part_id_;
   const block_id build_block_id_;
 
   JoinHashTable *hash_table_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
index 925dfb5..9065b85 100644
--- a/relational_operators/BuildLIPFilterOperator.cpp
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -65,6 +65,7 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
             new BuildLIPFilterWorkOrder(
                 query_id_,
                 input_relation_,
+                part_id,
                 input_block_id,
                 build_side_predicate,
                 storage_manager,
@@ -82,6 +83,7 @@ bool BuildLIPFilterOperator::getAllWorkOrders(
             new BuildLIPFilterWorkOrder(
                 query_id_,
                 input_relation_,
+                part_id,
                 input_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
                 build_side_predicate,
                 storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
index df722ef..9b23dd9 100644
--- a/relational_operators/BuildLIPFilterOperator.hpp
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -171,6 +171,7 @@ class BuildLIPFilterWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param input_relation The relation to build LIP filters on.
+   * @param part_id The partition id of 'input_relation'.
    * @param build_block_id The block id.
    * @param build_side_predicate The predicate to be applied to filter the input
    *        relation before building the LIP filters (or nullptr if no predicate
@@ -181,12 +182,13 @@ class BuildLIPFilterWorkOrder : public WorkOrder {
    **/
   BuildLIPFilterWorkOrder(const std::size_t query_id,
                           const CatalogRelationSchema &input_relation,
+                          const partition_id part_id,
                           const block_id build_block_id,
                           const Predicate *build_side_predicate,
                           StorageManager *storage_manager,
                           LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
                           LIPFilterBuilder *lip_filter_builder)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
         build_block_id_(build_block_id),
         build_side_predicate_(build_side_predicate),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 6083dd5..c0def6f 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -565,6 +565,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
                       tmb)
 target_link_libraries(quickstep_relationaloperators_WorkOrder
                       glog
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_utility_Macros
                       tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 3d36e20..9d8d9f6 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -67,7 +67,7 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
 }
 
 void DestroyAggregationStateWorkOrder::execute() {
-  query_context_->destroyAggregationState(aggr_state_index_, part_id_);
+  query_context_->destroyAggregationState(aggr_state_index_, partition_id_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/DestroyAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
index 990160f..c5b092f 100644
--- a/relational_operators/DestroyAggregationStateOperator.hpp
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -110,9 +110,8 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
       const QueryContext::aggregation_state_id aggr_state_index,
       const partition_id part_id,
       QueryContext *query_context)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         aggr_state_index_(aggr_state_index),
-        part_id_(part_id),
         query_context_(DCHECK_NOTNULL(query_context)) {}
 
   ~DestroyAggregationStateWorkOrder() override {}
@@ -121,7 +120,6 @@ class DestroyAggregationStateWorkOrder : public WorkOrder {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const partition_id part_id_;
   QueryContext *query_context_;
 
   DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/DestroyHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index d9ea19b..4b93f0c 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -68,7 +68,7 @@ bool DestroyHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *contai
 
 
 void DestroyHashWorkOrder::execute() {
-  query_context_->destroyJoinHashTable(hash_table_index_, part_id_);
+  query_context_->destroyJoinHashTable(hash_table_index_, partition_id_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/DestroyHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index a65e739..3a6cf0e 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -107,9 +107,8 @@ class DestroyHashWorkOrder : public WorkOrder {
                        const QueryContext::join_hash_table_id hash_table_index,
                        const partition_id part_id,
                        QueryContext *query_context)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         hash_table_index_(hash_table_index),
-        part_id_(part_id),
         query_context_(DCHECK_NOTNULL(query_context)) {}
 
   ~DestroyHashWorkOrder() override {}
@@ -118,7 +117,6 @@ class DestroyHashWorkOrder : public WorkOrder {
 
  private:
   const QueryContext::join_hash_table_id hash_table_index_;
-  const partition_id part_id_;
   QueryContext *query_context_;
 
   DISALLOW_COPY_AND_ASSIGN(DestroyHashWorkOrder);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index efa4cba..68d0ef4 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -96,7 +96,6 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 }
 
 void FinalizeAggregationWorkOrder::execute() {
-  (void) part_id_;
   state_->finalizeAggregate(state_partition_id_, output_destination_);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 12433b9..a6f1ef6 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -139,8 +139,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
                                const std::size_t state_partition_id,
                                AggregationOperationState *state,
                                InsertDestination *output_destination)
-      : WorkOrder(query_id),
-        part_id_(part_id),
+      : WorkOrder(query_id, part_id),
         state_partition_id_(state_partition_id),
         state_(DCHECK_NOTNULL(state)),
         output_destination_(DCHECK_NOTNULL(output_destination)) {}
@@ -150,7 +149,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  const std::size_t part_id_, state_partition_id_;
+  const std::size_t state_partition_id_;
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index e385e46..b07e4cb 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -447,7 +447,7 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
 
 
 void HashInnerJoinWorkOrder::execute() {
-  output_destination_->setInputPartitionId(part_id_);
+  output_destination_->setInputPartitionId(partition_id_);
 
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
@@ -680,7 +680,7 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
 }
 
 void HashSemiJoinWorkOrder::execute() {
-  output_destination_->setInputPartitionId(part_id_);
+  output_destination_->setInputPartitionId(partition_id_);
 
   if (residual_predicate_ == nullptr) {
     executeWithoutResidualPredicate();
@@ -1006,7 +1006,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
 }
 
 void HashOuterJoinWorkOrder::execute() {
-  output_destination_->setInputPartitionId(part_id_);
+  output_destination_->setInputPartitionId(partition_id_);
 
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 23267f8..316be66 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -338,12 +338,11 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -391,12 +390,11 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -417,15 +415,6 @@ class HashInnerJoinWorkOrder : public WorkOrder {
    **/
   void execute() override;
 
-  /**
-   * @brief Get the partition id.
-   *
-   * @return The partition id.
-   */
-  partition_id getPartitionId() const {
-    return part_id_;
-  }
-
  private:
   void executeWithoutCopyElision(ValueAccessor *probe_accesor);
 
@@ -435,7 +424,6 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const partition_id part_id_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
@@ -494,12 +482,11 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -547,12 +534,11 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -565,15 +551,6 @@ class HashSemiJoinWorkOrder : public WorkOrder {
 
   void execute() override;
 
-  /**
-   * @brief Get the partition id.
-   *
-   * @return The partition id.
-   */
-  partition_id getPartitionId() const {
-    return part_id_;
-  }
-
  private:
   void executeWithoutResidualPredicate();
 
@@ -583,7 +560,6 @@ class HashSemiJoinWorkOrder : public WorkOrder {
   const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const partition_id part_id_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
@@ -642,12 +618,11 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -695,12 +670,11 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
@@ -712,7 +686,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   ~HashAntiJoinWorkOrder() override {}
 
   void execute() override {
-    output_destination_->setInputPartitionId(part_id_);
+    output_destination_->setInputPartitionId(partition_id_);
 
     if (residual_predicate_ == nullptr) {
       executeWithoutResidualPredicate();
@@ -721,15 +695,6 @@ class HashAntiJoinWorkOrder : public WorkOrder {
     }
   }
 
-  /**
-   * @brief Get the partition id.
-   *
-   * @return The partition id.
-   */
-  partition_id getPartitionId() const {
-    return part_id_;
-  }
-
  private:
   void executeWithoutResidualPredicate();
 
@@ -739,7 +704,6 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const partition_id part_id_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
@@ -796,12 +760,11 @@ class HashOuterJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         selection_(selection),
         is_selection_on_build_(is_selection_on_build),
@@ -847,12 +810,11 @@ class HashOuterJoinWorkOrder : public WorkOrder {
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        part_id_(part_id),
         block_id_(lookup_block_id),
         selection_(selection),
         is_selection_on_build_(std::move(is_selection_on_build)),
@@ -865,21 +827,11 @@ class HashOuterJoinWorkOrder : public WorkOrder {
 
   void execute() override;
 
-  /**
-   * @brief Get the partition id.
-   *
-   * @return The partition id.
-   */
-  partition_id getPartitionId() const {
-    return part_id_;
-  }
-
  private:
   const CatalogRelationSchema &build_relation_;
   const CatalogRelationSchema &probe_relation_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const partition_id part_id_;
   const block_id block_id_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const std::vector<bool> is_selection_on_build_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 136686b..39a6fb4 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -53,6 +53,7 @@ bool InitializeAggregationOperator::getAllWorkOrders(
          ++state_part_id) {
       container->addNormalWorkOrder(
           new InitializeAggregationWorkOrder(query_id_,
+                                             part_id,
                                              state_part_id,
                                              agg_state),
           op_index_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index b7e9aae..0b052c7 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -106,13 +106,15 @@ class InitializeAggregationWorkOrder : public WorkOrder {
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this operator belongs.
+   * @param part_id The partition id.
    * @param state_partition_id The partition ID for which the work order is issued.
    * @param state The AggregationOperationState to be initialized.
    */
   InitializeAggregationWorkOrder(const std::size_t query_id,
+                                 const partition_id part_id,
                                  const std::size_t state_partition_id,
                                  AggregationOperationState *state)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         state_partition_id_(state_partition_id),
         state_(DCHECK_NOTNULL(state)) {}
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index 1c0bbec..121b1c3 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -69,6 +69,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrders(
                   query_id_,
                   left_input_relation_,
                   right_input_relation_,
+                  part_id,
                   left_block_id,
                   right_block_id,
                   query_context->getPredicate(join_predicate_index_),
@@ -244,6 +245,7 @@ std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrd
               query_id_,
               left_input_relation_,
               right_input_relation_,
+              part_id,
               left_relation_block_ids_[part_id][left_index],
               right_relation_block_ids_[part_id][right_index],
               query_context->getPredicate(join_predicate_index_),
@@ -280,6 +282,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine
                   query_id_,
                   left_input_relation_,
                   right_input_relation_,
+                  part_id,
                   left_block_id,
                   right_relation_block_ids_[part_id][right_index],
                   join_predicate,
@@ -302,6 +305,7 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine
               new NestedLoopsJoinWorkOrder(query_id_,
                                            left_input_relation_,
                                            right_input_relation_,
+                                           part_id,
                                            left_relation_block_ids_[part_id][left_index],
                                            right_block_id,
                                            join_predicate,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 64711e1..006d496 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -338,6 +338,7 @@ class NestedLoopsJoinWorkOrder : public WorkOrder {
    *        actually important).
    * @param right_input_relation The second relation in the join (order is not
    *        actually important).
+   * @param part_id The partition id.
    * @param left_block_id The block id of the first relation.
    * @param right_block_id The block id of the second relation.
    * @param join_predicate The join predicate to evaluate for each pair of
@@ -352,13 +353,14 @@ class NestedLoopsJoinWorkOrder : public WorkOrder {
       const std::size_t query_id,
       const CatalogRelationSchema &left_input_relation,
       const CatalogRelationSchema &right_input_relation,
+      const partition_id part_id,
       const block_id left_block_id,
       const block_id right_block_id,
       const Predicate *join_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       InsertDestination *output_destination,
       StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         left_input_relation_(left_input_relation),
         right_input_relation_(right_input_relation),
         left_block_id_(left_block_id),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 8615d74..08b1093 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -69,11 +69,10 @@ class RebuildWorkOrder : public WorkOrder {
       const partition_id part_id,
       const client_id scheduler_client_id,
       MessageBus *bus)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         block_ref_(std::move(block_ref)),
         input_operator_index_(input_operator_index),
         input_relation_id_(input_relation_id),
-        part_id_(part_id),
         scheduler_client_id_(scheduler_client_id),
         bus_(bus) {}
 
@@ -91,7 +90,7 @@ class RebuildWorkOrder : public WorkOrder {
     proto.set_block_id(block_ref_->getID());
     proto.set_relation_id(input_relation_id_);
     proto.set_query_id(query_id_);
-    proto.set_partition_id(part_id_);
+    proto.set_partition_id(partition_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -118,7 +117,6 @@ class RebuildWorkOrder : public WorkOrder {
   MutableBlockReference block_ref_;
   const std::size_t input_operator_index_;
   const relation_id input_relation_id_;
-  const partition_id part_id_;
   const client_id scheduler_client_id_;
 
   MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 845c563..3081eac 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -159,7 +159,7 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const partition_i
 }
 
 void SelectWorkOrder::execute() {
-  output_destination_->setInputPartitionId(part_id_);
+  output_destination_->setInputPartitionId(partition_id_);
 
   BlockReference block(
       storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index c11d681..f4937b3 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -295,9 +295,8 @@ class SelectWorkOrder : public WorkOrder {
                   StorageManager *storage_manager,
                   LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
                   const numa_node_id numa_node = 0)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
-        part_id_(part_id),
         input_block_id_(input_block_id),
         predicate_(predicate),
         simple_projection_(simple_projection),
@@ -343,9 +342,8 @@ class SelectWorkOrder : public WorkOrder {
                   StorageManager *storage_manager,
                   LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
                   const numa_node_id numa_node = 0)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
-        part_id_(part_id),
         input_block_id_(input_block_id),
         predicate_(predicate),
         simple_projection_(simple_projection),
@@ -371,7 +369,6 @@ class SelectWorkOrder : public WorkOrder {
 
  private:
   const CatalogRelationSchema &input_relation_;
-  const partition_id part_id_;
   const block_id input_block_id_;
   const Predicate *predicate_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 97f2a74..c1b3dad 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "utility/Macros.hpp"
 
@@ -295,20 +296,34 @@ class WorkOrder {
   /**
    * @brief Get the ID of the query which this WorkOder belongs to.
    **/
-  inline const std::size_t getQueryID() const {
+  inline std::size_t getQueryID() const {
     return query_id_;
   }
 
+  /**
+   * @brief Get the partition id.
+   *
+   * @return The partition id.
+   */
+  partition_id getPartitionId() const {
+    return partition_id_;
+  }
+
  protected:
   /**
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
+   * @param part_id The partition id.
    **/
-  explicit WorkOrder(const std::size_t query_id)
-      : query_id_(query_id) {}
+  explicit WorkOrder(const std::size_t query_id,
+                     const partition_id part_id = 0)
+      : query_id_(query_id),
+        partition_id_(part_id) {}
 
   const std::size_t query_id_;
+  const partition_id partition_id_;
+
   // A vector of preferred NUMA node IDs where this workorder should be executed.
   // These node IDs typically indicate the NUMA node IDs of the input(s) of the
   // workorder. Derived classes should ensure that there are no duplicate entries

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/32b5b83f/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 2b0bae4..68f7f55 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -96,6 +96,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                 << " in Shiftboss " << shiftboss_index;
       return new AggregationWorkOrder(
           query_id,
+          part_id,
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
           query_context->getAggregationState(
               proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index), part_id),
@@ -113,6 +114,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
+          part_id,
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
           query_context->getAggregationState(
@@ -120,7 +122,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::BUILD_LIP_FILTER: {
-      LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::BuildLIPFilterWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating BuildLIPFilterWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
 
       const QueryContext::lip_deployment_id lip_deployment_index =
@@ -130,6 +135,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)),
+          part_id,
           proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id),
           query_context->getPredicate(
               proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)),
@@ -379,6 +385,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_context->getAggregationState(
               proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index), part_id);
       return new InitializeAggregationWorkOrder(query_id,
+                                                part_id,
                                                 proto.GetExtension(
                                                     serialization::InitializeAggregationWorkOrder::state_partition_id),
                                                 aggr_state);
@@ -393,7 +400,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
     }
     case serialization::NESTED_LOOP_JOIN: {
-      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new NestedLoopsJoinWorkOrder(
           query_id,
@@ -401,6 +411,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id)),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id)),
+          part_id,
           proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id),
           proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id),
           query_context->getPredicate(


Mime
View raw message