Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 61364200B2A for ; Sat, 11 Jun 2016 07:42:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5FDE3160A38; Sat, 11 Jun 2016 05:42:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D6921160A5A for ; Sat, 11 Jun 2016 07:42:20 +0200 (CEST) Received: (qmail 45328 invoked by uid 500); 11 Jun 2016 05:42:20 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 45319 invoked by uid 99); 11 Jun 2016 05:42:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Jun 2016 05:42:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 59D07180617 for ; Sat, 11 Jun 2016 05:42:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rnfga3O2ThjQ for ; Sat, 11 Jun 2016 05:42:07 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id DCB805FB33 for ; Sat, 11 Jun 2016 05:42:05 +0000 (UTC) Received: (qmail 44673 invoked by uid 99); 11 Jun 2016 05:42:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Jun 2016 05:42:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C312DDFC74; Sat, 11 Jun 2016 05:42:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Sat, 11 Jun 2016 05:42:11 -0000 Message-Id: <070cfda972954c7e8ee1c9dc3f947698@git.apache.org> In-Reply-To: <874532c5fdae4056ab541c0d8b287941@git.apache.org> References: <874532c5fdae4056ab541c0d8b287941@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/11] incubator-quickstep git commit: QUICKSTEP-10: Serialized WorkOrders as proto. archived-at: Sat, 11 Jun 2016 05:42:23 -0000 QUICKSTEP-10: Serialized WorkOrders as proto. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c9214ecb Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c9214ecb Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c9214ecb Branch: refs/heads/build_script Commit: c9214ecb1d481b3d1b02db0ffdf53852b11b540f Parents: ccd11c0 Author: Zuyu Zhang Authored: Tue Apr 12 16:55:48 2016 -0700 Committer: Zuyu Zhang Committed: Thu Jun 9 17:06:02 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 6 + query_execution/WorkOrderProtosContainer.hpp | 146 +++++++++++++++++++ query_execution/tests/Foreman_unittest.cpp | 6 + query_execution/tests/QueryManager_unittest.cpp | 6 + relational_operators/AggregationOperator.cpp | 34 +++++ relational_operators/AggregationOperator.hpp | 12 ++ relational_operators/BuildHashOperator.cpp | 40 +++++ relational_operators/BuildHashOperator.hpp | 12 ++ relational_operators/CMakeLists.txt | 36 ++++- relational_operators/CreateIndexOperator.hpp | 9 ++ relational_operators/CreateTableOperator.hpp | 8 + relational_operators/DeleteOperator.cpp | 38 ++++- relational_operators/DeleteOperator.hpp | 14 +- relational_operators/DestroyHashOperator.cpp | 17 +++ relational_operators/DestroyHashOperator.hpp | 3 + relational_operators/DropTableOperator.cpp | 23 +++ relational_operators/DropTableOperator.hpp | 3 + .../FinalizeAggregationOperator.cpp | 20 +++ .../FinalizeAggregationOperator.hpp | 3 + relational_operators/HashJoinOperator.cpp | 124 ++++++++++++++++ relational_operators/HashJoinOperator.hpp | 21 +++ relational_operators/InsertOperator.cpp | 19 +++ relational_operators/InsertOperator.hpp | 3 + .../NestedLoopsJoinOperator.cpp | 142 ++++++++++++++++++ .../NestedLoopsJoinOperator.hpp | 51 +++++++ relational_operators/RelationalOperator.hpp | 22 +++ relational_operators/SampleOperator.cpp | 101 ++++++++++--- relational_operators/SampleOperator.hpp | 12 ++ relational_operators/SaveBlocksOperator.cpp | 18 +++ relational_operators/SaveBlocksOperator.hpp | 3 + relational_operators/SelectOperator.cpp | 43 ++++++ relational_operators/SelectOperator.hpp | 12 ++ relational_operators/SortMergeRunOperator.cpp | 68 +++++++++ relational_operators/SortMergeRunOperator.hpp | 12 ++ .../SortRunGenerationOperator.cpp | 39 +++++ .../SortRunGenerationOperator.hpp | 12 ++ relational_operators/TableGeneratorOperator.cpp | 20 ++- relational_operators/TableGeneratorOperator.hpp | 5 +- relational_operators/TextScanOperator.cpp | 125 ++++++++++++---- relational_operators/TextScanOperator.hpp | 24 ++- relational_operators/TextScanOperator.proto | 22 --- relational_operators/UpdateOperator.cpp | 23 +++ relational_operators/UpdateOperator.hpp | 3 + relational_operators/WorkOrder.proto | 21 +-- relational_operators/WorkOrderFactory.cpp | 7 +- 45 files changed, 1291 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 8306f78..95bc0d6 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -42,6 +42,7 @@ add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryE add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp) add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp) add_library(quickstep_queryexecution_QueryManager QueryManager.cpp QueryManager.hpp) +add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp) add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp) add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp) add_library(quickstep_queryexecution_WorkerDirectory ../empty_src.cpp WorkerDirectory.hpp) @@ -157,6 +158,10 @@ target_link_libraries(quickstep_queryexecution_QueryManager quickstep_utility_DAG quickstep_utility_Macros tmb) +target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer + glog + quickstep_relationaloperators_WorkOrder_proto + quickstep_utility_Macros) target_link_libraries(quickstep_queryexecution_WorkOrdersContainer glog quickstep_relationaloperators_WorkOrder @@ -193,6 +198,7 @@ target_link_libraries(quickstep_queryexecution quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_QueryManager + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_queryexecution_Worker quickstep_queryexecution_WorkerDirectory http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/query_execution/WorkOrderProtosContainer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkOrderProtosContainer.hpp b/query_execution/WorkOrderProtosContainer.hpp new file mode 100644 index 0000000..5043755 --- /dev/null +++ b/query_execution/WorkOrderProtosContainer.hpp @@ -0,0 +1,146 @@ +/** + * Copyright 2015-2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_ + +#include +#include +#include +#include + +#include "relational_operators/WorkOrder.pb.h" // IWYU pragma: keep +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup QueryExecution + * @{ + */ + + /** + * @brief A container used in the distributed version to hold the normal + * (non-rebuild) WorkOrder protos for a given query. + * + * @note This container stays alive during the lifetime of the query. + **/ +class WorkOrderProtosContainer { + public: + /** + * @brief Constructor + * + * @param num_operators Number of operators in the query DAG. + **/ + explicit WorkOrderProtosContainer(const std::size_t num_operators) + : num_operators_(num_operators), + operator_containers_(num_operators_) { + DCHECK_NE(num_operators_, 0u); + } + + /** + * @brief Destructor. + * + * @note If the query is executed normally, we should never encounter a + * situation where at the time of deletion the WorkOrderProtosContainer has + * pending WorkOrders. + **/ + ~WorkOrderProtosContainer() { + for (std::size_t op = 0; op < num_operators_; ++op) { + if (hasWorkOrderProto(op)) { + LOG(WARNING) << "Destroying a WorkOrderProtosContainer that still has pending WorkOrder protos."; + break; + } + } + } + + /** + * @brief Check if there are some pending WorkOrders for the given operator. + * + * @param operator_index Index of the operator. + * + * @return If there are pending WorkOrders. + **/ + bool hasWorkOrderProto(const std::size_t operator_index) const { + DCHECK_LT(operator_index, num_operators_); + return !operator_containers_[operator_index].empty(); + } + + /** + * @brief Get a WorkOrder for a given operator. + * + * @param operator_index The index of the operator. + * + * @return Release a WorkOrder proto. If no WorkOrder proto is available, + * return nullptr. + **/ + serialization::WorkOrder* getWorkOrderProto(const std::size_t operator_index) { + DCHECK_LT(operator_index, num_operators_); + + if (operator_containers_[operator_index].empty()) { + return nullptr; + } + + serialization::WorkOrder *proto = + operator_containers_[operator_index].front().release(); + operator_containers_[operator_index].pop(); + + return proto; + } + + /** + * @brief Add a WorkOrder generated from a given + * operator. + * + * @param workorder A pointer to the WorkOrder to be added. + * @param operator_index The index of the operator in the query DAG. + **/ + void addWorkOrderProto(serialization::WorkOrder *proto, + const std::size_t operator_index) { + DCHECK(proto != nullptr); + DCHECK_LT(operator_index, num_operators_); + + operator_containers_[operator_index].emplace( + std::unique_ptr(proto)); + } + + /** + * @brief Get the number of all pending WorkOrders + * for a given operator. + * + * @param operator_index The index of the operator. + * + * @return The number of pending WorkOrders. + **/ + std::size_t getNumWorkOrderProtos(const std::size_t operator_index) const { + DCHECK_LT(operator_index, num_operators_); + return operator_containers_[operator_index].size(); + } + + private: + const std::size_t num_operators_; + + std::vector>> operator_containers_; + + DISALLOW_COPY_AND_ASSIGN(WorkOrderProtosContainer); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_WORKORDER_PROTOS_CONTAINER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/query_execution/tests/Foreman_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/Foreman_unittest.cpp b/query_execution/tests/Foreman_unittest.cpp index 79f8f4a..cbe5088 100644 --- a/query_execution/tests/Foreman_unittest.cpp +++ b/query_execution/tests/Foreman_unittest.cpp @@ -58,6 +58,8 @@ using tmb::client_id; namespace quickstep { +class WorkOrderProtosContainer; + class MockWorkOrder : public WorkOrder { public: explicit MockWorkOrder(const int op_index) @@ -168,6 +170,10 @@ class MockOperator: public RelationalOperator { return num_calls_get_workorders_ == max_getworkorder_iters_; } + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { + return true; + } + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { ++num_calls_feedblock_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp index 308d5ca..9ba5978 100644 --- a/query_execution/tests/QueryManager_unittest.cpp +++ b/query_execution/tests/QueryManager_unittest.cpp @@ -59,6 +59,8 @@ using tmb::client_id; namespace quickstep { +class WorkOrderProtosContainer; + class MockWorkOrder : public WorkOrder { public: explicit MockWorkOrder(const int op_index) @@ -169,6 +171,10 @@ class MockOperator: public RelationalOperator { return num_calls_get_workorders_ == max_getworkorder_iters_; } + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { + return true; + } + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { ++num_calls_feedblock_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/AggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp index 7252541..6d16930 100644 --- a/relational_operators/AggregationOperator.cpp +++ b/relational_operators/AggregationOperator.cpp @@ -20,7 +20,9 @@ #include #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/AggregationOperationState.hpp" #include "storage/StorageBlockInfo.hpp" @@ -61,6 +63,38 @@ bool AggregationOperator::getAllWorkOrders( } } +bool AggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::AGGREGATION); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::AggregationWorkOrder::block_id, block); + proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_); + + return proto; +} + + void AggregationWorkOrder::execute() { state_->aggregateBlock(input_block_id_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/AggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp index f340d4e..4bcbcf6 100644 --- a/relational_operators/AggregationOperator.hpp +++ b/relational_operators/AggregationOperator.hpp @@ -38,8 +38,11 @@ namespace quickstep { class AggregationOperationState; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -80,6 +83,8 @@ class AggregationOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { input_relation_block_ids_.push_back(input_block_id); } @@ -91,6 +96,13 @@ class AggregationOperator : public RelationalOperator { } private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const bool input_relation_is_stored_; std::vector input_relation_block_ids_; const QueryContext::aggregation_state_id aggr_state_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/BuildHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp index 9dc4afe..1c2ff05 100644 --- a/relational_operators/BuildHashOperator.cpp +++ b/relational_operators/BuildHashOperator.cpp @@ -22,7 +22,9 @@ #include "catalog/CatalogRelation.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/HashTable.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" @@ -99,6 +101,44 @@ bool BuildHashOperator::getAllWorkOrders( } } +bool BuildHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::BUILD_HASH); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::BuildHashWorkOrder::relation_id, input_relation_.getID()); + for (const attribute_id attr_id : join_key_attributes_) { + proto->AddExtension(serialization::BuildHashWorkOrder::join_key_attributes, attr_id); + } + proto->SetExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable, + any_join_key_attributes_nullable_); + proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_); + proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block); + + return proto; +} + + void BuildHashWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(build_block_id_, input_relation_)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/BuildHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index 50dd7d6..464bbf8 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -39,6 +39,7 @@ namespace quickstep { class CatalogRelationSchema; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; struct TupleReference; @@ -46,6 +47,8 @@ struct TupleReference; template class HashTable; typedef HashTable JoinHashTable; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -96,6 +99,8 @@ class BuildHashOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { input_relation_block_ids_.push_back(input_block_id); @@ -109,6 +114,13 @@ class BuildHashOperator : public RelationalOperator { } private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const CatalogRelation &input_relation_; const bool input_relation_is_stored_; const std::vector join_key_attributes_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index eb73c07..91d1097 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -73,9 +73,11 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_AggregationOperationState quickstep_storage_StorageBlockInfo quickstep_utility_Macros @@ -85,9 +87,11 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_HashTable quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -120,9 +124,11 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager @@ -132,9 +138,11 @@ target_link_libraries(quickstep_relationaloperators_DeleteOperator target_link_libraries(quickstep_relationaloperators_DestroyHashOperator glog quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_utility_Macros tmb) target_link_libraries(quickstep_relationaloperators_DropTableOperator @@ -143,9 +151,11 @@ target_link_libraries(quickstep_relationaloperators_DropTableOperator quickstep_catalog_CatalogDatabaseLite quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager quickstep_utility_Macros @@ -155,9 +165,11 @@ target_link_libraries(quickstep_relationaloperators_FinalizeAggregationOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_AggregationOperationState quickstep_utility_Macros tmb) @@ -170,9 +182,11 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_HashTable quickstep_storage_InsertDestination quickstep_storage_StorageBlock @@ -194,9 +208,11 @@ target_link_libraries(quickstep_relationaloperators_InsertOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_types_containers_Tuple quickstep_utility_Macros @@ -209,9 +225,11 @@ target_link_libraries(quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -245,9 +263,11 @@ target_link_libraries(quickstep_relationaloperators_SampleOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext - quickstep_relationaloperators_RelationalOperator + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -257,9 +277,11 @@ target_link_libraries(quickstep_relationaloperators_SampleOperator target_link_libraries(quickstep_relationaloperators_SaveBlocksOperator glog quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager quickstep_utility_Macros @@ -270,9 +292,11 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator quickstep_catalog_CatalogTypedefs quickstep_catalog_PartitionSchemeHeader quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -289,11 +313,13 @@ target_link_libraries(quickstep_relationaloperators_SortMergeRunOperator quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_SortMergeRunOperatorHelpers quickstep_relationaloperators_SortMergeRunOperator_proto quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo quickstep_threading_ThreadIDBasedMap quickstep_utility_Macros @@ -326,9 +352,11 @@ target_link_libraries(quickstep_relationaloperators_SortRunGenerationOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -343,9 +371,11 @@ target_link_libraries(quickstep_relationaloperators_TableGeneratorOperator quickstep_catalog_CatalogTypedefs quickstep_expressions_tablegenerator_GeneratorFunctionHandle quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlockInfo quickstep_types_containers_ColumnVectorsValueAccessor @@ -358,9 +388,11 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_types_Type quickstep_types_TypedValue @@ -379,9 +411,11 @@ target_link_libraries(quickstep_relationaloperators_UpdateOperator quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_WorkOrderProtosContainer quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/CreateIndexOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp index 11a01ae..18ca656 100644 --- a/relational_operators/CreateIndexOperator.hpp +++ b/relational_operators/CreateIndexOperator.hpp @@ -1,6 +1,7 @@ /** * Copyright 2016, Quickstep Research Group, Computer Sciences Department, * University of Wisconsin—Madison. + * Copyright 2016 Pivotal Software, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +38,7 @@ namespace quickstep { class CatalogRelation; class QueryContext; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -76,6 +78,13 @@ class CreateIndexOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + /** + * @note no WorkOrder proto generated for this operator. + **/ + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { + return true; + } + void updateCatalogOnCompletion() override; private: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/CreateTableOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp index 60bcef4..6d91142 100644 --- a/relational_operators/CreateTableOperator.hpp +++ b/relational_operators/CreateTableOperator.hpp @@ -36,6 +36,7 @@ namespace quickstep { class CatalogDatabase; class QueryContext; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -74,6 +75,13 @@ class CreateTableOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + /** + * @note no WorkOrder proto generated for this operator. + **/ + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override { + return true; + } + void updateCatalogOnCompletion() override; private: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DeleteOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp index 15dc9e3..47e36e9 100644 --- a/relational_operators/DeleteOperator.cpp +++ b/relational_operators/DeleteOperator.cpp @@ -26,13 +26,14 @@ #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" #include "threading/ThreadIDBasedMap.hpp" - #include "glog/logging.h" #include "tmb/id_typedefs.h" @@ -85,6 +86,41 @@ bool DeleteOperator::getAllWorkOrders( } } +bool DeleteOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (relation_is_stored_) { + // If relation_ is stored, iterate over the list of blocks in relation_. + if (!started_) { + for (const block_id input_block_id : relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* DeleteOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::DELETE); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::DeleteWorkOrder::operator_index, op_index_); + proto->SetExtension(serialization::DeleteWorkOrder::relation_id, relation_.getID()); + proto->SetExtension(serialization::DeleteWorkOrder::predicate_index, predicate_index_); + proto->SetExtension(serialization::DeleteWorkOrder::block_id, block); + + return proto; +} + + void DeleteWorkOrder::execute() { MutableBlockReference block( storage_manager_->getBlockMutable(input_block_id_, input_relation_)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DeleteOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp index c55f585..aa8a688 100644 --- a/relational_operators/DeleteOperator.hpp +++ b/relational_operators/DeleteOperator.hpp @@ -41,8 +41,11 @@ namespace quickstep { class CatalogRelationSchema; class Predicate; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -67,7 +70,7 @@ class DeleteOperator : public RelationalOperator { const CatalogRelation &relation, const QueryContext::predicate_id predicate_index, const bool relation_is_stored) - : RelationalOperator(query_id), + : RelationalOperator(query_id), relation_(relation), predicate_index_(predicate_index), relation_is_stored_(relation_is_stored), @@ -84,6 +87,8 @@ class DeleteOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + const relation_id getOutputRelationID() const override { return relation_.getID(); } @@ -101,6 +106,13 @@ class DeleteOperator : public RelationalOperator { } private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const CatalogRelation &relation_; const QueryContext::predicate_id predicate_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DestroyHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp index 8aa40b4..e748470 100644 --- a/relational_operators/DestroyHashOperator.cpp +++ b/relational_operators/DestroyHashOperator.cpp @@ -18,7 +18,9 @@ #include "relational_operators/DestroyHashOperator.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "tmb/id_typedefs.h" @@ -39,6 +41,21 @@ bool DestroyHashOperator::getAllWorkOrders( return work_generated_; } +bool DestroyHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::DESTROY_HASH); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index, hash_table_index_); + + container->addWorkOrderProto(proto, op_index_); + } + return work_generated_; +} + + void DestroyHashWorkOrder::execute() { query_context_->destroyJoinHashTable(hash_table_index_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DestroyHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp index 7d8acb7..181386f 100644 --- a/relational_operators/DestroyHashOperator.hpp +++ b/relational_operators/DestroyHashOperator.hpp @@ -32,6 +32,7 @@ namespace tmb { class MessageBus; } namespace quickstep { class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -63,6 +64,8 @@ class DestroyHashOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + private: const QueryContext::join_hash_table_id hash_table_index_; bool work_generated_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DropTableOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp index 256f6a1..727aa46 100644 --- a/relational_operators/DropTableOperator.cpp +++ b/relational_operators/DropTableOperator.cpp @@ -24,7 +24,9 @@ #include "catalog/CatalogDatabaseLite.hpp" #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" @@ -55,6 +57,27 @@ bool DropTableOperator::getAllWorkOrders( return work_generated_; } +bool DropTableOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::DROP_TABLE); + proto->set_query_id(query_id_); + + std::vector relation_blocks(relation_.getBlocksSnapshot()); + for (const block_id relation_block : relation_blocks) { + proto->AddExtension(serialization::DropTableWorkOrder::block_ids, relation_block); + } + + container->addWorkOrderProto(proto, op_index_); + + database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions); + } + + return work_generated_; +} + void DropTableOperator::updateCatalogOnCompletion() { const relation_id rel_id = relation_.getID(); if (only_drop_blocks_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/DropTableOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp index a0a8d6e..6c7fca3 100644 --- a/relational_operators/DropTableOperator.hpp +++ b/relational_operators/DropTableOperator.hpp @@ -41,6 +41,7 @@ class CatalogDatabaseLite; class CatalogRelation; class QueryContext; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -79,6 +80,8 @@ class DropTableOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void updateCatalogOnCompletion() override; private: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 1dc4188..20d0ee5 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -18,7 +18,9 @@ #include "relational_operators/FinalizeAggregationOperator.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/AggregationOperationState.hpp" #include "glog/logging.h" @@ -47,6 +49,24 @@ bool FinalizeAggregationOperator::getAllWorkOrders( return started_; } +bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !started_) { + started_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::FINALIZE_AGGREGATION); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index, + aggr_state_index_); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index, + output_destination_index_); + + container->addWorkOrderProto(proto, op_index_); + } + return started_; +} + + void FinalizeAggregationWorkOrder::execute() { state_->finalizeAggregate(output_destination_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index e8a403f..158a637 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -39,6 +39,7 @@ namespace quickstep { class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -79,6 +80,8 @@ class FinalizeAggregationOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + QueryContext::insert_destination_id getInsertDestinationID() const override { return output_destination_index_; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index b89cfb3..5a47b50 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -31,7 +31,9 @@ #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/HashTable.hpp" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" @@ -391,6 +393,128 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( return false; } +bool HashJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + switch (join_type_) { + case JoinType::kInnerJoin: + return getAllNonOuterJoinWorkOrderProtos(container, serialization::HashJoinWorkOrder::HASH_INNER_JOIN); + case JoinType::kLeftSemiJoin: + return getAllNonOuterJoinWorkOrderProtos(container, serialization::HashJoinWorkOrder::HASH_SEMI_JOIN); + case JoinType::kLeftAntiJoin: + return getAllNonOuterJoinWorkOrderProtos(container, serialization::HashJoinWorkOrder::HASH_ANTI_JOIN); + case JoinType::kLeftOuterJoin: + return getAllOuterJoinWorkOrderProtos(container); + default: + LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrderProtos()"; + } +} + +bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos( + WorkOrderProtosContainer *container, + const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type) { + // We wait until the building of global hash table is complete. + if (!blocking_dependencies_met_) { + return false; + } + + if (probe_relation_is_stored_) { + if (!started_) { + for (const block_id probe_block_id : probe_relation_block_ids_) { + container->addWorkOrderProto( + createNonOuterJoinWorkOrderProto(hash_join_type, probe_block_id), + op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < probe_relation_block_ids_.size()) { + container->addWorkOrderProto( + createNonOuterJoinWorkOrderProto(hash_join_type, + probe_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto( + const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type, + const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::HASH_JOIN); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::HashJoinWorkOrder::hash_join_work_order_type, hash_join_type); + proto->SetExtension(serialization::HashJoinWorkOrder::build_relation_id, build_relation_.getID()); + proto->SetExtension(serialization::HashJoinWorkOrder::probe_relation_id, probe_relation_.getID()); + for (const attribute_id attr_id : join_key_attributes_) { + proto->AddExtension(serialization::HashJoinWorkOrder::join_key_attributes, attr_id); + } + proto->SetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable, + any_join_key_attributes_nullable_); + proto->SetExtension(serialization::HashJoinWorkOrder::insert_destination_index, output_destination_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block); + proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_); + + return proto; +} + +bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *container) { + // We wait until the building of global hash table is complete. + if (!blocking_dependencies_met_) { + return false; + } + + if (probe_relation_is_stored_) { + if (!started_) { + for (const block_id probe_block_id : probe_relation_block_ids_) { + container->addWorkOrderProto(createOuterJoinWorkOrderProto(probe_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < probe_relation_block_ids_.size()) { + container->addWorkOrderProto( + createOuterJoinWorkOrderProto(probe_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::HASH_JOIN); + + proto->SetExtension(serialization::HashJoinWorkOrder::hash_join_work_order_type, + serialization::HashJoinWorkOrder::HASH_OUTER_JOIN); + proto->SetExtension(serialization::HashJoinWorkOrder::build_relation_id, build_relation_.getID()); + proto->SetExtension(serialization::HashJoinWorkOrder::probe_relation_id, probe_relation_.getID()); + for (const attribute_id attr_id : join_key_attributes_) { + proto->AddExtension(serialization::HashJoinWorkOrder::join_key_attributes, attr_id); + } + proto->SetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable, + any_join_key_attributes_nullable_); + proto->SetExtension(serialization::HashJoinWorkOrder::insert_destination_index, output_destination_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block); + + for (const bool is_attribute_on_build : is_selection_on_build_) { + proto->AddExtension(serialization::HashJoinWorkOrder::is_selection_on_build, is_attribute_on_build); + } + + return proto; +} + + void HashInnerJoinWorkOrder::execute() { if (FLAGS_vector_based_joined_tuple_collector) { executeWithCollectorType(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 1d5d4e3..6f4271d 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -30,6 +30,7 @@ #include "query_execution/QueryContext.hpp" #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/HashTable.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" @@ -47,6 +48,7 @@ class InsertDestination; class Predicate; class Scalar; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -161,6 +163,8 @@ class HashJoinOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { DCHECK(input_relation_id == probe_relation_.getID()); @@ -202,6 +206,23 @@ class HashJoinOperator : public RelationalOperator { QueryContext *query_context, StorageManager *storage_manager); + bool getAllNonOuterJoinWorkOrderProtos( + WorkOrderProtosContainer *container, + const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type); + + serialization::WorkOrder* createNonOuterJoinWorkOrderProto( + const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type, + const block_id block); + + bool getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *container); + + /** + * @brief Create HashOuterJoinWorkOrder proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createOuterJoinWorkOrderProto(const block_id block); + const CatalogRelation &build_relation_; const CatalogRelation &probe_relation_; const bool probe_relation_is_stored_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/InsertOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp index 3ec9933..963cdcd 100644 --- a/relational_operators/InsertOperator.cpp +++ b/relational_operators/InsertOperator.cpp @@ -20,7 +20,9 @@ #include #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "glog/logging.h" @@ -49,6 +51,23 @@ bool InsertOperator::getAllWorkOrders( return work_generated_; } +bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (blocking_dependencies_met_ && !work_generated_) { + work_generated_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::INSERT); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_); + proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_); + + container->addWorkOrderProto(proto, op_index_); + } + + return work_generated_; +} + + void InsertWorkOrder::execute() { output_destination_->insertTuple(*tuple_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/InsertOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp index 51c606d..78f5199 100644 --- a/relational_operators/InsertOperator.hpp +++ b/relational_operators/InsertOperator.hpp @@ -39,6 +39,7 @@ namespace quickstep { class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -78,6 +79,8 @@ class InsertOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + QueryContext::insert_destination_id getInsertDestinationID() const override { return output_destination_index_; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/NestedLoopsJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp index 5a47fca..43588ee 100644 --- a/relational_operators/NestedLoopsJoinOperator.cpp +++ b/relational_operators/NestedLoopsJoinOperator.cpp @@ -26,7 +26,9 @@ #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" @@ -150,6 +152,72 @@ bool NestedLoopsJoinOperator::getAllWorkOrders( } } +bool NestedLoopsJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (left_relation_is_stored_ && right_relation_is_stored_) { + // Make sure we generate workorders only once. + if (!all_workorders_generated_) { + for (const block_id left_block_id : left_relation_block_ids_) { + for (const block_id right_block_id : right_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(left_block_id, right_block_id), + op_index_); + } + } + all_workorders_generated_ = true; + } + return true; + } else if (!(left_relation_is_stored_ || right_relation_is_stored_)) { + // Both relations are not stored. + const std::vector::size_type new_left_blocks + = left_relation_block_ids_.size() - num_left_workorders_generated_; + const std::vector::size_type new_right_blocks + = right_relation_block_ids_.size() - num_right_workorders_generated_; + + std::size_t new_workorders = 0; + if (new_left_blocks > 0 && new_right_blocks > 0) { + // Blocks added to both left and right relations. + // First generate (left + new_left_blocks) * (new_right_blocks). + new_workorders = + getAllWorkOrderProtosHelperBothNotStored(container, + 0, + left_relation_block_ids_.size(), + num_right_workorders_generated_, + right_relation_block_ids_.size()); + + // Now generate new_left_blocks * (right). + new_workorders += + getAllWorkOrderProtosHelperBothNotStored(container, + num_left_workorders_generated_, + left_relation_block_ids_.size(), + 0, + num_right_workorders_generated_); + } else if (new_left_blocks == 0 && new_right_blocks > 0) { + // Only new right blocks are added. Generate left * new_right_blocks. + new_workorders = + getAllWorkOrderProtosHelperBothNotStored(container, + 0, + left_relation_block_ids_.size(), + num_right_workorders_generated_, + right_relation_block_ids_.size()); + } else if (new_left_blocks > 0 && new_right_blocks == 0) { + // Generate new_left_blocks * right + new_workorders = + getAllWorkOrderProtosHelperBothNotStored(container, + num_left_workorders_generated_, + left_relation_block_ids_.size(), + 0, + right_relation_block_ids_.size()); + } + if (new_workorders > 0) { + num_left_workorders_generated_ = left_relation_block_ids_.size(); + num_right_workorders_generated_ = right_relation_block_ids_.size(); + } + return done_feeding_left_relation_ && done_feeding_right_relation_; + } else { + // Only one relation is a stored relation. + return getAllWorkOrderProtosHelperOneStored(container); + } +} + std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container, QueryContext *query_context, StorageManager *storage_manager, @@ -241,6 +309,80 @@ bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContaine } } +std::size_t NestedLoopsJoinOperator::getAllWorkOrderProtosHelperBothNotStored( + WorkOrderProtosContainer *container, + const std::vector::size_type left_min, + const std::vector::size_type left_max, + const std::vector::size_type right_min, + const std::vector::size_type right_max) { + DCHECK(!(left_relation_is_stored_ || right_relation_is_stored_)); + DCHECK_LE(left_min, left_max); + DCHECK_LE(right_min, right_max); + + for (std::vector::size_type left_index = left_min; + left_index < left_max; + ++left_index) { + for (std::vector::size_type right_index = right_min; + right_index < right_max; + ++right_index) { + container->addWorkOrderProto( + createWorkOrderProto(left_relation_block_ids_[left_index], right_relation_block_ids_[right_index]), + op_index_); + } + } + // Return the number of workorders produced. + return (left_max - left_min) * (right_max - right_min); +} + +bool NestedLoopsJoinOperator::getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container) { + DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_); + + if (left_relation_is_stored_) { + for (std::vector::size_type right_index = num_right_workorders_generated_; + right_index < right_relation_block_ids_.size(); + ++right_index) { + for (const block_id left_block_id : left_relation_block_ids_) { + container->addWorkOrderProto( + createWorkOrderProto(left_block_id, right_relation_block_ids_[right_index]), + op_index_); + } + } + num_right_workorders_generated_ = right_relation_block_ids_.size(); + return done_feeding_right_relation_; + } else { + for (std::vector::size_type left_index = num_left_workorders_generated_; + left_index < left_relation_block_ids_.size(); + ++left_index) { + for (const block_id right_block_id : right_relation_block_ids_) { + container->addWorkOrderProto( + createWorkOrderProto(left_relation_block_ids_[left_index], right_block_id), + op_index_); + } + } + num_left_workorders_generated_ = left_relation_block_ids_.size(); + return done_feeding_left_relation_; + } +} + +serialization::WorkOrder* NestedLoopsJoinOperator::createWorkOrderProto(const block_id left_block, + const block_id right_block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::NESTED_LOOP_JOIN); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id, left_input_relation_.getID()); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id, right_input_relation_.getID()); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id, left_block); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id, right_block); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index, + output_destination_index_); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index, join_predicate_index_); + proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index, selection_index_); + + return proto; +} + + template void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_store, const TupleStorageSubBlock &right_store) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/NestedLoopsJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp index 0b13842..992e76d 100644 --- a/relational_operators/NestedLoopsJoinOperator.hpp +++ b/relational_operators/NestedLoopsJoinOperator.hpp @@ -44,8 +44,11 @@ class Predicate; class Scalar; class StorageManager; class TupleStorageSubBlock; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -119,6 +122,8 @@ class NestedLoopsJoinOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void doneFeedingInputBlocks(const relation_id rel_id) override { if (rel_id == left_input_relation_.getID()) { done_feeding_left_relation_ = true; @@ -187,6 +192,52 @@ class NestedLoopsJoinOperator : public RelationalOperator { QueryContext *query_context, StorageManager *storage_manager); + /** + * @brief Pairs block IDs from left and right relation block IDs and generates + * NestedLoopsJoinWorkOrder protos and pushes them to the + * WorkOrderProtosContainer when both relations are not stored + * relations. + * + * @param container A pointer to the WorkOrderProtosContainer to store the + * resulting WorkOrder protos. + * @param left_min The starting index in left_relation_block_ids_ from where + * we begin generating NestedLoopsJoinWorkOrders. + * @param left_max The index in left_relation_block_ids_ until which we + * generate NestedLoopsJoinWorkOrders (excluding left_max). + * @param right_min The starting index in right_relation_block_ids_ from where + * we begin generating NestedLoopsJoinWorkOrders. + * @param right_max The index in right_relation_block_ids_ until which we + * generate NestedLoopsJoinWorkOrders. (excluding right_max). + * + * @return The number of workorder protos generated during the execution of this + * function. + **/ + std::size_t getAllWorkOrderProtosHelperBothNotStored(WorkOrderProtosContainer *container, + const std::vector::size_type left_min, + const std::vector::size_type left_max, + const std::vector::size_type right_min, + const std::vector::size_type right_max); + + /** + * @brief Pairs block IDs from left and right relation block IDs and generates + * NestedLoopsJoinWorkOrder protos and pushes them to the + * WorkOrderProtosContainer when only one relation is a stored relation. + * + * @param container A pointer to the WorkOrderProtosContainer to store the + * resulting WorkOrder protos. + * + * @return Whether all work orders have been generated. + **/ + bool getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container); + + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id left_block, + const block_id right_block); + const CatalogRelation &left_input_relation_; const CatalogRelation &right_input_relation_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/RelationalOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp index c173a0a..116727b 100644 --- a/relational_operators/RelationalOperator.hpp +++ b/relational_operators/RelationalOperator.hpp @@ -36,6 +36,7 @@ namespace tmb { class MessageBus; } namespace quickstep { class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -82,6 +83,27 @@ class RelationalOperator { tmb::MessageBus *bus) = 0; /** + * @brief For the distributed version, generate all the next WorkOrder protos + * for this RelationalOperator + * + * @note If a RelationalOperator has blocking dependencies, it should not + * generate workorders unless all of the blocking dependencies have been + * met. + * + * @note If a RelationalOperator is not parallelizeable on a block-level, then + * only one WorkOrder consisting of all the work for this + * RelationalOperator should be generated. + * + * @param container A pointer to a WorkOrderProtosContainer to be used to + * store the generated WorkOrder protos. + * + * @return Whether the operator has finished generating work order protos. If + * \c false, the execution engine will invoke this method after at + * least one pending work order has finished executing. + **/ + virtual bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) = 0; + + /** * @brief Update Catalog upon the completion of this RelationalOperator, if * necessary. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SampleOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SampleOperator.cpp b/relational_operators/SampleOperator.cpp index 8d5fade..5e5a417 100644 --- a/relational_operators/SampleOperator.cpp +++ b/relational_operators/SampleOperator.cpp @@ -1,6 +1,7 @@ /** * Copyright 2016, Quickstep Research Group, Computer Sciences Department, * University of Wisconsin—Madison. + * Copyright 2016 Pivotal Software, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,10 +19,13 @@ #include "relational_operators/SampleOperator.hpp" #include +#include #include #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" @@ -82,24 +86,9 @@ bool SampleOperator::getAllWorkOrders( } return started_; } else { - if (is_block_sample_) { - while (num_workorders_generated_ < input_relation_block_ids_.size()) { - if (distribution(generator) <= probability) { - container->addNormalWorkOrder( - new SampleWorkOrder( - query_id_, - input_relation_, - input_relation_block_ids_[num_workorders_generated_], - is_block_sample_, - percentage_, - output_destination, - storage_manager), - op_index_); - ++num_workorders_generated_; - } - } - } else { - while (num_workorders_generated_ < input_relation_block_ids_.size()) { + if (is_block_sample_) { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + if (distribution(generator) <= probability) { container->addNormalWorkOrder( new SampleWorkOrder( query_id_, @@ -113,10 +102,86 @@ bool SampleOperator::getAllWorkOrders( ++num_workorders_generated_; } } + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addNormalWorkOrder( + new SampleWorkOrder( + query_id_, + input_relation_, + input_relation_block_ids_[num_workorders_generated_], + is_block_sample_, + percentage_, + output_destination, + storage_manager), + op_index_); + ++num_workorders_generated_; + } + } return done_feeding_input_relation_; } } +bool SampleOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + std::random_device random_device; + std::mt19937 generator(random_device()); + std::uniform_real_distribution<> distribution(0, 1); + const double probability = static_cast(percentage_) / 100; + + if (input_relation_is_stored_) { + if (!started_) { + // If the sampling is by block choose blocks randomly + if (is_block_sample_) { + for (const block_id input_block_id : input_relation_block_ids_) { + if (distribution(generator) <= probability) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + } + } else { + // Add all the blocks for tuple sampling which would handle + // the sampling from each block + for (const block_id input_block_id : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + } + started_ = true; + } + return true; + } else { + if (is_block_sample_) { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + if (distribution(generator) <= probability) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + } + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* SampleOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::SAMPLE); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::SampleWorkOrder::relation_id, input_relation_.getID()); + proto->SetExtension(serialization::SampleWorkOrder::block_id, block); + proto->SetExtension(serialization::SampleWorkOrder::is_block_sample, is_block_sample_); + proto->SetExtension(serialization::SampleWorkOrder::percentage, percentage_); + proto->SetExtension(serialization::SampleWorkOrder::insert_destination_index, output_destination_index_); + + return proto; +} + void SampleWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SampleOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp index 505daa2..f8fe5f6 100644 --- a/relational_operators/SampleOperator.hpp +++ b/relational_operators/SampleOperator.hpp @@ -42,8 +42,11 @@ class CatalogDatabase; class CatalogRelationSchema; class InsertDestination; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -96,6 +99,8 @@ class SampleOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { input_relation_block_ids_.push_back(input_block_id); } @@ -115,6 +120,13 @@ class SampleOperator : public RelationalOperator { } private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const CatalogRelation &input_relation_; const CatalogRelationSchema &output_relation_; const QueryContext::insert_destination_id output_destination_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SaveBlocksOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SaveBlocksOperator.cpp b/relational_operators/SaveBlocksOperator.cpp index 8127d88..5e0f33d 100644 --- a/relational_operators/SaveBlocksOperator.cpp +++ b/relational_operators/SaveBlocksOperator.cpp @@ -19,7 +19,9 @@ #include +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" @@ -46,6 +48,22 @@ bool SaveBlocksOperator::getAllWorkOrders( return done_feeding_input_relation_; } +bool SaveBlocksOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + while (num_workorders_generated_ < destination_block_ids_.size()) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::SAVE_BLOCKS); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::SaveBlocksWorkOrder::block_id, + destination_block_ids_[num_workorders_generated_]); + proto->SetExtension(serialization::SaveBlocksWorkOrder::force, force_); + + container->addWorkOrderProto(proto, op_index_); + + ++num_workorders_generated_; + } + return done_feeding_input_relation_; +} + void SaveBlocksOperator::feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) { destination_block_ids_.push_back(input_block_id); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SaveBlocksOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp index 6e2c72b..50032b6 100644 --- a/relational_operators/SaveBlocksOperator.hpp +++ b/relational_operators/SaveBlocksOperator.hpp @@ -37,6 +37,7 @@ namespace quickstep { class QueryContext; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; /** \addtogroup RelationalOperators @@ -69,6 +70,8 @@ class SaveBlocksOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override; void feedInputBlocks(const relation_id rel_id, std::vector *partially_filled_blocks) override { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SelectOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp index e9a96f3..eb6277e 100644 --- a/relational_operators/SelectOperator.cpp +++ b/relational_operators/SelectOperator.cpp @@ -21,7 +21,9 @@ #include #include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "storage/InsertDestination.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" @@ -170,6 +172,47 @@ bool SelectOperator::getAllWorkOrders( } } +bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id block) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::SELECT); + proto->set_query_id(query_id_); + + proto->SetExtension(serialization::SelectWorkOrder::relation_id, input_relation_.getID()); + proto->SetExtension(serialization::SelectWorkOrder::insert_destination_index, output_destination_index_); + proto->SetExtension(serialization::SelectWorkOrder::predicate_index, predicate_index_); + proto->SetExtension(serialization::SelectWorkOrder::block_id, block); + proto->SetExtension(serialization::SelectWorkOrder::simple_projection, simple_projection_); + if (simple_projection_) { + for (const attribute_id attr_id : simple_selection_) { + proto->AddExtension(serialization::SelectWorkOrder::simple_selection, attr_id); + } + } + proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_); + + return proto; +} + + void SelectWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0])); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index ac7b038..0c10686 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -49,8 +49,11 @@ class InsertDestination; class Predicate; class Scalar; class StorageManager; +class WorkOrderProtosContainer; class WorkOrdersContainer; +namespace serialization { class WorkOrder; } + /** \addtogroup RelationalOperators * @{ */ @@ -192,6 +195,8 @@ class SelectOperator : public RelationalOperator { const tmb::client_id scheduler_client_id, tmb::MessageBus *bus) override; + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override { if (input_relation_.hasPartitionScheme()) { const partition_id part_id = @@ -243,6 +248,13 @@ class SelectOperator : public RelationalOperator { InsertDestination *output_destination); private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + const CatalogRelation &input_relation_; const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_;