quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject incubator-quickstep git commit: Batched Aggr WorkOrders in the distributed version.
Date Tue, 16 May 2017 02:29:50 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/batched-aggr-wo [created] e56527dd5


Batched Aggr WorkOrders in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e56527dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e56527dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e56527dd

Branch: refs/heads/batched-aggr-wo
Commit: e56527dd5d74d60ab9dd20771e01d0ed308e0665
Parents: ed72e24
Author: Zuyu Zhang <zuyuz@apache.org>
Authored: Mon May 15 19:29:43 2017 -0700
Committer: Zuyu Zhang <zuyuz@apache.org>
Committed: Mon May 15 19:29:43 2017 -0700

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp       |   2 +-
 query_execution/Shiftboss.cpp                |  38 +++---
 relational_operators/AggregationOperator.cpp |  13 +-
 relational_operators/WorkOrder.proto         |   2 +-
 relational_operators/WorkOrderFactory.cpp    | 149 ++++++++++++++--------
 relational_operators/WorkOrderFactory.hpp    |   4 +-
 6 files changed, 129 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index e5e0eee..ca80a79 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -248,7 +248,7 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
   switch (work_order_proto.work_order_type()) {
     case S::AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
-      block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
+      block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id, 0);
       break;
     case S::FINALIZE_AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 21e7858..ce3eab6 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -185,28 +185,30 @@ void Shiftboss::run() {
           const std::size_t query_id = proto.query_id();
           DCHECK_EQ(1u, query_contexts_.count(query_id));
 
-          unique_ptr<WorkOrder> work_order(
+          vector<unique_ptr<WorkOrder>> work_orders(
               WorkOrderFactory::ReconstructFromProto(proto.work_order(), shiftboss_index_,
&database_cache_,
                                                      query_contexts_[query_id].get(), storage_manager_,
                                                      shiftboss_client_id_local_, bus_local_,
hdfs_));
 
-          unique_ptr<WorkerMessage> worker_message(
-              WorkerMessage::WorkOrderMessage(work_order.release(), proto.operator_index()));
-
-          TaggedMessage worker_tagged_message(worker_message.get(),
-                                              sizeof(*worker_message),
-                                              kWorkOrderMessage);
-
-          const size_t worker_index = getSchedulableWorker();
-          DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client
" << shiftboss_client_id_local_
-                     << " forwarded WorkOrderMessage from Foreman to Worker " <<
worker_index;
-
-          const MessageBus::SendStatus send_status =
-              QueryExecutionUtil::SendTMBMessage(bus_local_,
-                                                 shiftboss_client_id_local_,
-                                                 workers_->getClientID(worker_index),
-                                                 move(worker_tagged_message));
-          CHECK(send_status == MessageBus::SendStatus::kOK);
+          for (int i = 0; i < work_orders.size(); ++i) {
+            unique_ptr<WorkerMessage> worker_message(
+                WorkerMessage::WorkOrderMessage(work_orders[i].release(), proto.operator_index()));
+
+            TaggedMessage worker_tagged_message(worker_message.get(),
+                                                sizeof(*worker_message),
+                                                kWorkOrderMessage);
+
+            const size_t worker_index = getSchedulableWorker();
+            DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client
" << shiftboss_client_id_local_
+                       << " forwarded WorkOrderMessage from Foreman to Worker " <<
worker_index;
+
+            const MessageBus::SendStatus send_status =
+                QueryExecutionUtil::SendTMBMessage(bus_local_,
+                                                   shiftboss_client_id_local_,
+                                                   workers_->getClientID(worker_index),
+                                                   move(worker_tagged_message));
+            CHECK(send_status == MessageBus::SendStatus::kOK);
+          }
           break;
         }
         case kInitiateRebuildMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index e111f5b..416783b 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -72,9 +72,18 @@ bool AggregationOperator::getAllWorkOrders(
 bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (input_relation_is_stored_) {
     if (!started_) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::AGGREGATION);
+      proto->set_query_id(query_id_);
+
       for (const block_id input_block_id : input_relation_block_ids_) {
-        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+        proto->AddExtension(serialization::AggregationWorkOrder::block_id, input_block_id);
       }
+
+      proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
+      proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
+
+      container->addWorkOrderProto(proto, op_index_);
       started_ = true;
     }
     return true;
@@ -94,7 +103,7 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const
block_
   proto->set_work_order_type(serialization::AGGREGATION);
   proto->set_query_id(query_id_);
 
-  proto->SetExtension(serialization::AggregationWorkOrder::block_id, block);
+  proto->AddExtension(serialization::AggregationWorkOrder::block_id, block);
   proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
   proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 7231c84..15305f6 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -62,7 +62,7 @@ message AggregationWorkOrder {
   extend WorkOrder {
     // All required.
     optional uint32 aggr_state_index = 16;
-    optional fixed64 block_id = 17;
+    repeated fixed64 block_id = 17;
     optional int32 lip_deployment_index = 18;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index d63bb62..c34a40c 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -60,8 +60,10 @@
 
 #include "tmb/id_typedefs.h"
 
+using std::make_unique;
 using std::move;
 using std::vector;
+using std::unique_ptr;
 
 namespace quickstep {
 
@@ -70,7 +72,7 @@ class LIPFilterAdaptiveProber;
 class Predicate;
 class Scalar;
 
-WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder &proto,
+vector<unique_ptr<WorkOrder>> WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
&proto,
                                                   const std::size_t shiftboss_index,
                                                   CatalogDatabaseLite *catalog_database,
                                                   QueryContext *query_context,
@@ -83,23 +85,31 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       << "Attempted to create WorkOrder from an invalid proto description:\n"
       << proto.DebugString();
 
+  vector<unique_ptr<WorkOrder>> work_orders;
+
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
       LOG(INFO) << "Creating AggregationWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new AggregationWorkOrder(
-          proto.query_id(),
-          proto.GetExtension(serialization::AggregationWorkOrder::block_id),
-          query_context->getAggregationState(
-              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
-          CreateLIPFilterAdaptiveProberHelper(
-              proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index),
query_context));
+      AggregationOperationState *aggr_state =
+            query_context->getAggregationState(
+                proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
+      const QueryContext::lip_deployment_id lip_deployment_index =
+          proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index);
+      for (int i = 0; i < proto.ExtensionSize(serialization::AggregationWorkOrder::block_id);
++i) {
+        work_orders.push_back(make_unique<AggregationWorkOrder>(
+            proto.query_id(),
+            proto.GetExtension(serialization::AggregationWorkOrder::block_id, i),
+            aggr_state,
+            CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context)));
+      }
+      break;
     }
     case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
       LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " <<
proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
 
-      return new BuildAggregationExistenceMapWorkOrder(
+      work_orders.push_back(make_unique<BuildAggregationExistenceMapWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
@@ -107,7 +117,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute),
           query_context->getAggregationState(
               proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::BUILD_LIP_FILTER: {
       LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << proto.query_id()
@@ -116,7 +127,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       const QueryContext::lip_deployment_id lip_deployment_index =
           proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
 
-      return new BuildLIPFilterWorkOrder(
+      work_orders.push_back(make_unique<BuildLIPFilterWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)),
@@ -125,7 +136,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)),
           storage_manager,
           CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context),
-          CreateLIPFilterBuilderHelper(lip_deployment_index, query_context));
+          CreateLIPFilterBuilderHelper(lip_deployment_index, query_context)));
+      break;
     }
     case serialization::BUILD_HASH: {
       LOG(INFO) << "Creating BuildHashWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
@@ -138,7 +150,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       const partition_id part_id =
           proto.GetExtension(serialization::BuildHashWorkOrder::partition_id);
 
-      return new BuildHashWorkOrder(
+      work_orders.push_back(make_unique<BuildHashWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildHashWorkOrder::relation_id)),
@@ -150,11 +162,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index),
part_id),
           storage_manager,
           CreateLIPFilterBuilderHelper(
-              proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index),
query_context));
+              proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index),
query_context)));
+      break;
     }
     case serialization::DELETE: {
       LOG(INFO) << "Creating DeleteWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
-      return new DeleteWorkOrder(
+      work_orders.push_back(make_unique<DeleteWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::DeleteWorkOrder::relation_id)),
@@ -164,27 +177,30 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager,
           proto.GetExtension(serialization::DeleteWorkOrder::operator_index),
           shiftboss_client_id,
-          bus);
+          bus));
+      break;
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
       LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " <<
proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new DestroyAggregationStateWorkOrder(
+      work_orders.push_back(make_unique<DestroyAggregationStateWorkOrder>(
           proto.query_id(),
           proto.GetExtension(
               serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
-          query_context);
+          query_context));
+      break;
     }
     case serialization::DESTROY_HASH: {
       LOG(INFO) << "Creating DestroyHashWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new DestroyHashWorkOrder(
+      work_orders.push_back(make_unique<DestroyHashWorkOrder>(
           proto.query_id(),
           proto.GetExtension(
               serialization::DestroyHashWorkOrder::join_hash_table_index),
           proto.GetExtension(
               serialization::DestroyHashWorkOrder::partition_id),
-          query_context);
+          query_context));
+      break;
     }
     case serialization::DROP_TABLE: {
       LOG(INFO) << "Creating DropTableWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
@@ -194,28 +210,30 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
             proto.GetExtension(serialization::DropTableWorkOrder::block_ids, i));
       }
 
-      return new DropTableWorkOrder(
+      work_orders.push_back(make_unique<DropTableWorkOrder>(
           proto.query_id(),
           move(blocks),
           storage_manager,
           proto.HasExtension(serialization::DropTableWorkOrder::relation_id)
               ? proto.GetExtension(serialization::DropTableWorkOrder::relation_id)
               : kInvalidCatalogId,
-          catalog_database);
+          catalog_database));
+      break;
     }
     case serialization::FINALIZE_AGGREGATION: {
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
       // TODO(quickstep-team): Handle inner-table partitioning in the distributed
       // setting.
-      return new FinalizeAggregationWorkOrder(
+      work_orders.push_back(make_unique<FinalizeAggregationWorkOrder>(
           proto.query_id(),
           0uL /* partition_id */,
           query_context->getAggregationState(proto.GetExtension(
               serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
-                                     insert_destination_index)));
+                                     insert_destination_index))));
+      break;
     }
     case serialization::HASH_JOIN: {
       const auto hash_join_work_order_type =
@@ -267,7 +285,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
         case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: {
           LOG(INFO) << "Creating HashAntiJoinWorkOrder for Query " << proto.query_id()
                     << " in Shiftboss " << shiftboss_index;
-          return new HashAntiJoinWorkOrder(
+      work_orders.push_back(make_unique<HashAntiJoinWorkOrder>(
               proto.query_id(),
               build_relation,
               probe_relation,
@@ -280,12 +298,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               hash_table,
               output_destination,
               storage_manager,
-              lip_filter_adaptive_prober);
+              lip_filter_adaptive_prober));
+      break;
         }
         case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: {
           LOG(INFO) << "Creating HashInnerJoinWorkOrder for Query " << proto.query_id()
                     << " in Shiftboss " << shiftboss_index;
-          return new HashInnerJoinWorkOrder(
+      work_orders.push_back(make_unique<HashInnerJoinWorkOrder>(
               proto.query_id(),
               build_relation,
               probe_relation,
@@ -298,7 +317,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               hash_table,
               output_destination,
               storage_manager,
-              lip_filter_adaptive_prober);
+              lip_filter_adaptive_prober));
+      break;
         }
         case serialization::HashJoinWorkOrder::HASH_OUTER_JOIN: {
           vector<bool> is_selection_on_build;
@@ -311,7 +331,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
           LOG(INFO) << "Creating HashOuterJoinWorkOrder for Query " << proto.query_id()
                     << " in Shiftboss " << shiftboss_index;
-          return new HashOuterJoinWorkOrder(
+      work_orders.push_back(make_unique<HashOuterJoinWorkOrder>(
               proto.query_id(),
               build_relation,
               probe_relation,
@@ -324,12 +344,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               hash_table,
               output_destination,
               storage_manager,
-              lip_filter_adaptive_prober);
+              lip_filter_adaptive_prober));
+      break;
         }
         case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: {
           LOG(INFO) << "Creating HashSemiJoinWorkOrder for Query " << proto.query_id()
                     << " in Shiftboss " << shiftboss_index;
-          return new HashSemiJoinWorkOrder(
+      work_orders.push_back(make_unique<HashSemiJoinWorkOrder>(
               proto.query_id(),
               build_relation,
               probe_relation,
@@ -342,25 +363,28 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               hash_table,
               output_destination,
               storage_manager,
-              lip_filter_adaptive_prober);
+              lip_filter_adaptive_prober));
+      break;
         }
         default:
           LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto";
       }
+      break;
     }
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
-      return new InsertWorkOrder(
+      work_orders.push_back(make_unique<InsertWorkOrder>(
           proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
           query_context->releaseTuple(
-              proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
+              proto.GetExtension(serialization::InsertWorkOrder::tuple_index))));
+      break;
     }
     case serialization::NESTED_LOOP_JOIN: {
       LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new NestedLoopsJoinWorkOrder(
+      work_orders.push_back(make_unique<NestedLoopsJoinWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id)),
@@ -374,11 +398,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::SAMPLE: {
       LOG(INFO) << "Creating SampleWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
-      return new SampleWorkOrder(
+      work_orders.push_back(make_unique<SampleWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SampleWorkOrder::relation_id)),
@@ -387,16 +412,18 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::SampleWorkOrder::percentage),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SampleWorkOrder::insert_destination_index)),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::SAVE_BLOCKS: {
       LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new SaveBlocksWorkOrder(
+      work_orders.push_back(make_unique<SaveBlocksWorkOrder>(
           proto.query_id(),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::SELECT: {
       LOG(INFO) << "Creating SelectWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
@@ -408,7 +435,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
             proto.GetExtension(serialization::SelectWorkOrder::simple_selection, i));
       }
 
-      return new SelectWorkOrder(
+      work_orders.push_back(make_unique<SelectWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SelectWorkOrder::relation_id)),
@@ -424,7 +451,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
           storage_manager,
           CreateLIPFilterAdaptiveProberHelper(
-              proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context));
+              proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context)));
+      break;
     }
     case serialization::SORT_MERGE_RUN: {
       LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << proto.query_id()
@@ -440,7 +468,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
         runs.push_back(move(run));
       }
 
-      return new SortMergeRunWorkOrder(
+      work_orders.push_back(make_unique<SortMergeRunWorkOrder>(
           proto.query_id(),
           query_context->getSortConfig(
               proto.GetExtension(serialization::SortMergeRunWorkOrder::sort_config_index)),
@@ -454,12 +482,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager,
           proto.GetExtension(serialization::SortMergeRunWorkOrder::operator_index),
           shiftboss_client_id,
-          bus);
+          bus));
+      break;
     }
     case serialization::SORT_RUN_GENERATION: {
       LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new SortRunGenerationWorkOrder(
+      work_orders.push_back(make_unique<SortRunGenerationWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::relation_id)),
@@ -468,21 +497,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::TABLE_GENERATOR: {
       LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
                 << " in Shiftboss " << shiftboss_index;
-      return new TableGeneratorWorkOrder(
+      work_orders.push_back(make_unique<TableGeneratorWorkOrder>(
           proto.query_id(),
           query_context->getGeneratorFunctionHandle(
               proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index))));
+      break;
     }
     case serialization::TEXT_SCAN: {
       LOG(INFO) << "Creating TextScanWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
-      return new TextScanWorkOrder(
+      work_orders.push_back(make_unique<TextScanWorkOrder>(
           proto.query_id(),
           proto.GetExtension(serialization::TextScanWorkOrder::filename),
           proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
@@ -491,7 +522,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
-          hdfs);
+          hdfs));
+      break;
     }
     case serialization::UNION_ALL: {
       LOG(INFO) << "Creating UnionAllWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
@@ -500,7 +532,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
         select_attribute_id.push_back(
             proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i));
       }
-      return new UnionAllWorkOrder(
+      work_orders.push_back(make_unique<UnionAllWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::UnionAllWorkOrder::relation_id)),
@@ -508,11 +540,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           select_attribute_id,
           query_context->getInsertDestination(
               proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index)),
-          storage_manager);
+          storage_manager));
+      break;
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id()
<< " in Shiftboss " << shiftboss_index;
-      return new UpdateWorkOrder(
+      work_orders.push_back(make_unique<UpdateWorkOrder>(
           proto.query_id(),
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::UpdateWorkOrder::relation_id)),
@@ -526,7 +559,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager,
           proto.GetExtension(serialization::UpdateWorkOrder::operator_index),
           shiftboss_client_id,
-          bus);
+          bus));
+      break;
     }
     case serialization::WINDOW_AGGREGATION: {
       LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << proto.query_id()
@@ -537,17 +571,20 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
             proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i));
       }
 
-      return new WindowAggregationWorkOrder(
+      work_orders.push_back(make_unique<WindowAggregationWorkOrder>(
           proto.query_id(),
           query_context->getWindowAggregationState(
               proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
           move(blocks),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index))));
+      break;
     }
     default:
       LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto
in Shiftboss" << shiftboss_index;
   }
+
+  return work_orders;
 }
 
 bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrderFactory.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
index ece687b..07b7fa8 100644
--- a/relational_operators/WorkOrderFactory.hpp
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -21,6 +21,8 @@
 #define QUICKSTEP_RELATIONAL_OPERATORS_WORK_ORDER_FACTORY_HPP_
 
 #include <cstddef>
+#include <memory>
+#include <vector>
 
 #include "utility/Macros.hpp"
 
@@ -63,7 +65,7 @@ class WorkOrderFactory {
    *
    * @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
    **/
-  static WorkOrder* ReconstructFromProto(const serialization::WorkOrder &proto,
+  static std::vector<std::unique_ptr<WorkOrder>> ReconstructFromProto(const serialization::WorkOrder
&proto,
                                          const std::size_t shiftboss_index,
                                          CatalogDatabaseLite *catalog_database,
                                          QueryContext *query_context,


Mime
View raw message