Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D7CD200C85 for ; Tue, 16 May 2017 04:30:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1BE8C160BCE; Tue, 16 May 2017 02:30:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 93926160BC2 for ; Tue, 16 May 2017 04:29:59 +0200 (CEST) Received: (qmail 80241 invoked by uid 500); 16 May 2017 02:29:58 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 80232 invoked by uid 99); 16 May 2017 02:29:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 May 2017 02:29:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D8800C0B3C for ; Tue, 16 May 2017 02:29:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id I75g3bOHmzSz for ; Tue, 16 May 2017 02:29:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8CC165F5C6 for ; Tue, 16 May 2017 02:29:51 +0000 (UTC) Received: (qmail 79981 invoked by uid 99); 16 May 2017 02:29:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 May 2017 02:29:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E2FEDFFAC; Tue, 16 May 2017 02:29:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <1e9890e9ca774444a928f0d86aae77a8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Batched Aggr WorkOrders in the distributed version. Date: Tue, 16 May 2017 02:29:50 +0000 (UTC) archived-at: Tue, 16 May 2017 02:30:01 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/batched-aggr-wo [created] e56527dd5 Batched Aggr WorkOrders in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e56527dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e56527dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e56527dd Branch: refs/heads/batched-aggr-wo Commit: e56527dd5d74d60ab9dd20771e01d0ed308e0665 Parents: ed72e24 Author: Zuyu Zhang Authored: Mon May 15 19:29:43 2017 -0700 Committer: Zuyu Zhang Committed: Mon May 15 19:29:43 2017 -0700 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 2 +- query_execution/Shiftboss.cpp | 38 +++--- relational_operators/AggregationOperator.cpp | 13 +- relational_operators/WorkOrder.proto | 2 +- relational_operators/WorkOrderFactory.cpp | 149 ++++++++++++++-------- relational_operators/WorkOrderFactory.hpp | 4 +- 6 files changed, 129 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index e5e0eee..ca80a79 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -248,7 +248,7 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage switch (work_order_proto.work_order_type()) { case S::AGGREGATION: aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index); - block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id); + block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id, 0); break; case S::FINALIZE_AGGREGATION: aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 21e7858..ce3eab6 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -185,28 +185,30 @@ void Shiftboss::run() { const std::size_t query_id = proto.query_id(); DCHECK_EQ(1u, query_contexts_.count(query_id)); - unique_ptr work_order( + vector> work_orders( WorkOrderFactory::ReconstructFromProto(proto.work_order(), shiftboss_index_, &database_cache_, query_contexts_[query_id].get(), storage_manager_, shiftboss_client_id_local_, bus_local_, hdfs_)); - unique_ptr worker_message( - WorkerMessage::WorkOrderMessage(work_order.release(), proto.operator_index())); - - TaggedMessage worker_tagged_message(worker_message.get(), - sizeof(*worker_message), - kWorkOrderMessage); - - const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_ - << " forwarded WorkOrderMessage from Foreman to Worker " << worker_index; - - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_local_, - shiftboss_client_id_local_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); + for (int i = 0; i < work_orders.size(); ++i) { + unique_ptr worker_message( + WorkerMessage::WorkOrderMessage(work_orders[i].release(), proto.operator_index())); + + TaggedMessage worker_tagged_message(worker_message.get(), + sizeof(*worker_message), + kWorkOrderMessage); + + const size_t worker_index = getSchedulableWorker(); + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_ + << " forwarded WorkOrderMessage from Foreman to Worker " << worker_index; + + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_local_, + shiftboss_client_id_local_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + } break; } case kInitiateRebuildMessage: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/AggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp index e111f5b..416783b 100644 --- a/relational_operators/AggregationOperator.cpp +++ b/relational_operators/AggregationOperator.cpp @@ -72,9 +72,18 @@ bool AggregationOperator::getAllWorkOrders( bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { if (input_relation_is_stored_) { if (!started_) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::AGGREGATION); + proto->set_query_id(query_id_); + for (const block_id input_block_id : input_relation_block_ids_) { - container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + proto->AddExtension(serialization::AggregationWorkOrder::block_id, input_block_id); } + + proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_); + proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_); + + container->addWorkOrderProto(proto, op_index_); started_ = true; } return true; @@ -94,7 +103,7 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_ proto->set_work_order_type(serialization::AGGREGATION); proto->set_query_id(query_id_); - proto->SetExtension(serialization::AggregationWorkOrder::block_id, block); + proto->AddExtension(serialization::AggregationWorkOrder::block_id, block); proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_); proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 7231c84..15305f6 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -62,7 +62,7 @@ message AggregationWorkOrder { extend WorkOrder { // All required. optional uint32 aggr_state_index = 16; - optional fixed64 block_id = 17; + repeated fixed64 block_id = 17; optional int32 lip_deployment_index = 18; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index d63bb62..c34a40c 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -60,8 +60,10 @@ #include "tmb/id_typedefs.h" +using std::make_unique; using std::move; using std::vector; +using std::unique_ptr; namespace quickstep { @@ -70,7 +72,7 @@ class LIPFilterAdaptiveProber; class Predicate; class Scalar; -WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder &proto, +vector> WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder &proto, const std::size_t shiftboss_index, CatalogDatabaseLite *catalog_database, QueryContext *query_context, @@ -83,23 +85,31 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder << "Attempted to create WorkOrder from an invalid proto description:\n" << proto.DebugString(); + vector> work_orders; + switch (proto.work_order_type()) { case serialization::AGGREGATION: { LOG(INFO) << "Creating AggregationWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new AggregationWorkOrder( - proto.query_id(), - proto.GetExtension(serialization::AggregationWorkOrder::block_id), - query_context->getAggregationState( - proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)), - CreateLIPFilterAdaptiveProberHelper( - proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context)); + AggregationOperationState *aggr_state = + query_context->getAggregationState( + proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)); + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index); + for (int i = 0; i < proto.ExtensionSize(serialization::AggregationWorkOrder::block_id); ++i) { + work_orders.push_back(make_unique( + proto.query_id(), + proto.GetExtension(serialization::AggregationWorkOrder::block_id, i), + aggr_state, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context))); + } + break; } case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: { LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new BuildAggregationExistenceMapWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)), @@ -107,7 +117,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute), query_context->getAggregationState( proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)), - storage_manager); + storage_manager)); + break; } case serialization::BUILD_LIP_FILTER: { LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << proto.query_id() @@ -116,7 +127,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder const QueryContext::lip_deployment_id lip_deployment_index = proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index); - return new BuildLIPFilterWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)), @@ -125,7 +136,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)), storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context), - CreateLIPFilterBuilderHelper(lip_deployment_index, query_context)); + CreateLIPFilterBuilderHelper(lip_deployment_index, query_context))); + break; } case serialization::BUILD_HASH: { LOG(INFO) << "Creating BuildHashWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; @@ -138,7 +150,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder const partition_id part_id = proto.GetExtension(serialization::BuildHashWorkOrder::partition_id); - return new BuildHashWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::BuildHashWorkOrder::relation_id)), @@ -150,11 +162,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index), part_id), storage_manager, CreateLIPFilterBuilderHelper( - proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context)); + proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context))); + break; } case serialization::DELETE: { LOG(INFO) << "Creating DeleteWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new DeleteWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::DeleteWorkOrder::relation_id)), @@ -164,27 +177,30 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager, proto.GetExtension(serialization::DeleteWorkOrder::operator_index), shiftboss_client_id, - bus); + bus)); + break; } case serialization::DESTROY_AGGREGATION_STATE: { LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new DestroyAggregationStateWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), proto.GetExtension( serialization::DestroyAggregationStateWorkOrder::aggr_state_index), - query_context); + query_context)); + break; } case serialization::DESTROY_HASH: { LOG(INFO) << "Creating DestroyHashWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new DestroyHashWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), proto.GetExtension( serialization::DestroyHashWorkOrder::join_hash_table_index), proto.GetExtension( serialization::DestroyHashWorkOrder::partition_id), - query_context); + query_context)); + break; } case serialization::DROP_TABLE: { LOG(INFO) << "Creating DropTableWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; @@ -194,28 +210,30 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::DropTableWorkOrder::block_ids, i)); } - return new DropTableWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), move(blocks), storage_manager, proto.HasExtension(serialization::DropTableWorkOrder::relation_id) ? proto.GetExtension(serialization::DropTableWorkOrder::relation_id) : kInvalidCatalogId, - catalog_database); + catalog_database)); + break; } case serialization::FINALIZE_AGGREGATION: { LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; // TODO(quickstep-team): Handle inner-table partitioning in the distributed // setting. - return new FinalizeAggregationWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), 0uL /* partition_id */, query_context->getAggregationState(proto.GetExtension( serialization::FinalizeAggregationWorkOrder::aggr_state_index)), query_context->getInsertDestination( proto.GetExtension(serialization::FinalizeAggregationWorkOrder:: - insert_destination_index))); + insert_destination_index)))); + break; } case serialization::HASH_JOIN: { const auto hash_join_work_order_type = @@ -267,7 +285,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: { LOG(INFO) << "Creating HashAntiJoinWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new HashAntiJoinWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), build_relation, probe_relation, @@ -280,12 +298,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder hash_table, output_destination, storage_manager, - lip_filter_adaptive_prober); + lip_filter_adaptive_prober)); + break; } case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: { LOG(INFO) << "Creating HashInnerJoinWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new HashInnerJoinWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), build_relation, probe_relation, @@ -298,7 +317,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder hash_table, output_destination, storage_manager, - lip_filter_adaptive_prober); + lip_filter_adaptive_prober)); + break; } case serialization::HashJoinWorkOrder::HASH_OUTER_JOIN: { vector is_selection_on_build; @@ -311,7 +331,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder LOG(INFO) << "Creating HashOuterJoinWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new HashOuterJoinWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), build_relation, probe_relation, @@ -324,12 +344,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder hash_table, output_destination, storage_manager, - lip_filter_adaptive_prober); + lip_filter_adaptive_prober)); + break; } case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: { LOG(INFO) << "Creating HashSemiJoinWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new HashSemiJoinWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), build_relation, probe_relation, @@ -342,25 +363,28 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder hash_table, output_destination, storage_manager, - lip_filter_adaptive_prober); + lip_filter_adaptive_prober)); + break; } default: LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto"; } + break; } case serialization::INSERT: { LOG(INFO) << "Creating InsertWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new InsertWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), query_context->getInsertDestination( proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)), query_context->releaseTuple( - proto.GetExtension(serialization::InsertWorkOrder::tuple_index))); + proto.GetExtension(serialization::InsertWorkOrder::tuple_index)))); + break; } case serialization::NESTED_LOOP_JOIN: { LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new NestedLoopsJoinWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id)), @@ -374,11 +398,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index)), query_context->getInsertDestination( proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)), - storage_manager); + storage_manager)); + break; } case serialization::SAMPLE: { LOG(INFO) << "Creating SampleWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new SampleWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::SampleWorkOrder::relation_id)), @@ -387,16 +412,18 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SampleWorkOrder::percentage), query_context->getInsertDestination( proto.GetExtension(serialization::SampleWorkOrder::insert_destination_index)), - storage_manager); + storage_manager)); + break; } case serialization::SAVE_BLOCKS: { LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new SaveBlocksWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id), proto.GetExtension(serialization::SaveBlocksWorkOrder::force), - storage_manager); + storage_manager)); + break; } case serialization::SELECT: { LOG(INFO) << "Creating SelectWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; @@ -408,7 +435,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SelectWorkOrder::simple_selection, i)); } - return new SelectWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::SelectWorkOrder::relation_id)), @@ -424,7 +451,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)), storage_manager, CreateLIPFilterAdaptiveProberHelper( - proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context)); + proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context))); + break; } case serialization::SORT_MERGE_RUN: { LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << proto.query_id() @@ -440,7 +468,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder runs.push_back(move(run)); } - return new SortMergeRunWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), query_context->getSortConfig( proto.GetExtension(serialization::SortMergeRunWorkOrder::sort_config_index)), @@ -454,12 +482,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager, proto.GetExtension(serialization::SortMergeRunWorkOrder::operator_index), shiftboss_client_id, - bus); + bus)); + break; } case serialization::SORT_RUN_GENERATION: { LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new SortRunGenerationWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::SortRunGenerationWorkOrder::relation_id)), @@ -468,21 +497,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)), query_context->getInsertDestination( proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)), - storage_manager); + storage_manager)); + break; } case serialization::TABLE_GENERATOR: { LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new TableGeneratorWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), query_context->getGeneratorFunctionHandle( proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)), query_context->getInsertDestination( - proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index))); + proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)))); + break; } case serialization::TEXT_SCAN: { LOG(INFO) << "Creating TextScanWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new TextScanWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), proto.GetExtension(serialization::TextScanWorkOrder::filename), proto.GetExtension(serialization::TextScanWorkOrder::text_offset), @@ -491,7 +522,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), query_context->getInsertDestination( proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), - hdfs); + hdfs)); + break; } case serialization::UNION_ALL: { LOG(INFO) << "Creating UnionAllWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; @@ -500,7 +532,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder select_attribute_id.push_back( proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i)); } - return new UnionAllWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::UnionAllWorkOrder::relation_id)), @@ -508,11 +540,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder select_attribute_id, query_context->getInsertDestination( proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index)), - storage_manager); + storage_manager)); + break; } case serialization::UPDATE: { LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index; - return new UpdateWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), catalog_database->getRelationSchemaById( proto.GetExtension(serialization::UpdateWorkOrder::relation_id)), @@ -526,7 +559,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager, proto.GetExtension(serialization::UpdateWorkOrder::operator_index), shiftboss_client_id, - bus); + bus)); + break; } case serialization::WINDOW_AGGREGATION: { LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << proto.query_id() @@ -537,17 +571,20 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::WindowAggregationWorkOrder::block_ids, i)); } - return new WindowAggregationWorkOrder( + work_orders.push_back(make_unique( proto.query_id(), query_context->getWindowAggregationState( proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)), move(blocks), query_context->getInsertDestination( - proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index))); + proto.GetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index)))); + break; } default: LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto in Shiftboss" << shiftboss_index; } + + return work_orders; } bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e56527dd/relational_operators/WorkOrderFactory.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp index ece687b..07b7fa8 100644 --- a/relational_operators/WorkOrderFactory.hpp +++ b/relational_operators/WorkOrderFactory.hpp @@ -21,6 +21,8 @@ #define QUICKSTEP_RELATIONAL_OPERATORS_WORK_ORDER_FACTORY_HPP_ #include +#include +#include #include "utility/Macros.hpp" @@ -63,7 +65,7 @@ class WorkOrderFactory { * * @return A new WorkOrder reconstructed from the supplied Protocol Buffer. **/ - static WorkOrder* ReconstructFromProto(const serialization::WorkOrder &proto, + static std::vector> ReconstructFromProto(const serialization::WorkOrder &proto, const std::size_t shiftboss_index, CatalogDatabaseLite *catalog_database, QueryContext *query_context,