Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8B08E200CB6 for ; Thu, 15 Jun 2017 03:47:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 89A13160BE8; Thu, 15 Jun 2017 01:47:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 49001160BDB for ; Thu, 15 Jun 2017 03:47:53 +0200 (CEST) Received: (qmail 60118 invoked by uid 500); 15 Jun 2017 01:47:52 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 60108 invoked by uid 99); 15 Jun 2017 01:47:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jun 2017 01:47:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D2FAD1AFD82 for ; Thu, 15 Jun 2017 01:47:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.221 X-Spam-Level: X-Spam-Status: No, score=-4.221 tagged_above=-999 required=6.31 tests=[FILL_THIS_FORM=0.001, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5H3XtMMV_iBs for ; Thu, 15 Jun 2017 01:47:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 1C8755F485 for ; Thu, 15 Jun 2017 01:47:39 +0000 (UTC) Received: (qmail 60079 invoked by uid 99); 15 Jun 2017 01:47:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jun 2017 01:47:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2BA5BDFAF5; Thu, 15 Jun 2017 01:47:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <95159e0bfe15436c8342161b105dce7a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Using PartitionSchemeHeader in Physical Plan node. Date: Thu, 15 Jun 2017 01:47:39 +0000 (UTC) archived-at: Thu, 15 Jun 2017 01:47:55 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/master 13c16b9cb -> 5fbfd2111 Using PartitionSchemeHeader in Physical Plan node. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5fbfd211 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5fbfd211 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5fbfd211 Branch: refs/heads/master Commit: 5fbfd21110091cfaf262daa8d9d2620a4e3f03bb Parents: 13c16b9 Author: Zuyu Zhang Authored: Tue Jun 13 21:50:41 2017 -0500 Committer: Zuyu Zhang Committed: Wed Jun 14 20:42:37 2017 -0500 ---------------------------------------------------------------------- query_optimizer/CMakeLists.txt | 3 +- query_optimizer/ExecutionGenerator.cpp | 100 ++++++++++++++++------- query_optimizer/physical/BinaryJoin.cpp | 7 ++ query_optimizer/physical/BinaryJoin.hpp | 8 +- query_optimizer/physical/CMakeLists.txt | 8 ++ query_optimizer/physical/HashJoin.cpp | 4 +- query_optimizer/physical/HashJoin.hpp | 24 ++++-- query_optimizer/physical/Join.hpp | 9 +- query_optimizer/physical/Physical.hpp | 52 +++++++++++- query_optimizer/physical/Selection.cpp | 40 ++++++++- query_optimizer/physical/Selection.hpp | 27 ++++-- query_optimizer/physical/TableGenerator.hpp | 18 +++- query_optimizer/physical/TableReference.cpp | 55 +++++++++++++ query_optimizer/physical/TableReference.hpp | 21 ++--- query_optimizer/rules/CMakeLists.txt | 4 +- relational_operators/HashJoinOperator.cpp | 6 ++ relational_operators/HashJoinOperator.hpp | 4 +- relational_operators/SelectOperator.cpp | 13 +-- relational_operators/SelectOperator.hpp | 10 ++- relational_operators/WorkOrder.proto | 2 + relational_operators/WorkOrderFactory.cpp | 9 +- storage/CMakeLists.txt | 1 + storage/InsertDestination.cpp | 34 ++------ storage/InsertDestination.hpp | 34 +++++++- utility/CMakeLists.txt | 1 + utility/PlanVisualizer.cpp | 6 ++ 26 files changed, 394 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index c969f16..04af02c 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -103,6 +103,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_physical_InsertTuple quickstep_queryoptimizer_physical_LIPFilterConfiguration quickstep_queryoptimizer_physical_NestedLoopsJoin + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_PatternMatcher quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType @@ -211,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_OptimizerTree target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator ${GFLAGS_LIB_NAME} quickstep_queryoptimizer_LogicalToPhysicalMapper + quickstep_queryoptimizer_Validator quickstep_queryoptimizer_logical_Logical quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_rules_AttachLIPFilters @@ -230,7 +232,6 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator quickstep_queryoptimizer_strategy_OneToOne quickstep_queryoptimizer_strategy_Selection quickstep_queryoptimizer_strategy_Strategy - quickstep_queryoptimizer_Validator quickstep_utility_Macros quickstep_utility_PlanVisualizer) target_link_libraries(quickstep_queryoptimizer_QueryHandle http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 3b2fe08..a7c7328 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -84,6 +84,7 @@ #include "query_optimizer/physical/InsertTuple.hpp" #include "query_optimizer/physical/LIPFilterConfiguration.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" #include "query_optimizer/physical/PatternMatcher.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" @@ -140,6 +141,7 @@ #include "gflags/gflags.h" #include "glog/logging.h" +using std::make_unique; using std::move; using std::static_pointer_cast; using std::unique_ptr; @@ -363,16 +365,52 @@ void ExecutionGenerator::createTemporaryCatalogRelation( ++aid; } + const P::PartitionSchemeHeader *partition_scheme_header = physical->getOutputPartitionSchemeHeader(); + if (partition_scheme_header) { + PartitionSchemeHeader::PartitionAttributeIds output_partition_attr_ids; + for (const auto &partition_equivalent_expr_ids : partition_scheme_header->partition_expr_ids) { + DCHECK(!partition_equivalent_expr_ids.empty()); + const E::ExprId partition_expr_id = *partition_equivalent_expr_ids.begin(); + DCHECK(attribute_substitution_map_.find(partition_expr_id) != attribute_substitution_map_.end()); + output_partition_attr_ids.push_back(attribute_substitution_map_[partition_expr_id]->getID()); + } + + const size_t num_partition = partition_scheme_header->num_partitions; + unique_ptr output_partition_scheme_header; + switch (partition_scheme_header->partition_type) { + case P::PartitionSchemeHeader::PartitionType::kHash: + output_partition_scheme_header = + make_unique(num_partition, move(output_partition_attr_ids)); + break; + case P::PartitionSchemeHeader::PartitionType::kRandom: + output_partition_scheme_header = + make_unique(num_partition); + break; + case P::PartitionSchemeHeader::PartitionType::kRange: + LOG(FATAL) << "Unimplemented"; + default: + LOG(FATAL) << "Unknown partition type"; + } + auto output_partition_scheme = make_unique(output_partition_scheme_header.release()); + + insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); + insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) + ->MergeFrom(output_partition_scheme->getProto()); + + catalog_relation->setPartitionScheme(output_partition_scheme.release()); + } else { + insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); + } + *catalog_relation_output = catalog_relation.get(); const relation_id output_rel_id = catalog_database_->addRelation( catalog_relation.release()); + insert_destination_proto->set_relation_id(output_rel_id); + #ifdef QUICKSTEP_DISTRIBUTED referenced_relation_ids_.insert(output_rel_id); #endif - - insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); - insert_destination_proto->set_relation_id(output_rel_id); } void ExecutionGenerator::dropAllTemporaryRelations() { @@ -499,13 +537,28 @@ bool ExecutionGenerator::convertSimpleProjection( void ExecutionGenerator::convertSelection( const P::SelectionPtr &physical_selection) { + const P::PhysicalPtr input = physical_selection->input(); + const CatalogRelationInfo *input_relation_info = + findRelationInfoOutputByPhysical(input); + DCHECK(input_relation_info != nullptr); + const CatalogRelation &input_relation = *input_relation_info->relation; + // Check if the Selection is only for renaming columns. if (physical_selection->filter_predicate() == nullptr) { - const std::vector input_attributes = - physical_selection->input()->getOutputAttributes(); + const P::PartitionSchemeHeader *physical_select_partition_scheme_header = + physical_selection->getOutputPartitionSchemeHeader(); + const P::PartitionSchemeHeader *physical_input_partition_scheme_header = input->getOutputPartitionSchemeHeader(); + + const bool are_same_physical_partition_scheme_headers = + (!physical_select_partition_scheme_header && !physical_input_partition_scheme_header) || + (physical_select_partition_scheme_header && physical_input_partition_scheme_header && + physical_select_partition_scheme_header->equal(*physical_input_partition_scheme_header)); + + const std::vector input_attributes = input->getOutputAttributes(); + const std::vector &project_expressions = physical_selection->project_expressions(); - if (project_expressions.size() == input_attributes.size()) { + if (project_expressions.size() == input_attributes.size() && are_same_physical_partition_scheme_headers) { bool has_different_attrs = false; for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) { if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) { @@ -514,12 +567,9 @@ void ExecutionGenerator::convertSelection( } } if (!has_different_attrs) { - const std::unordered_map::const_iterator input_catalog_rel_it = - physical_to_output_relation_map_.find(physical_selection->input()); - DCHECK(input_catalog_rel_it != physical_to_output_relation_map_.end()); - if (!input_catalog_rel_it->second.isStoredRelation()) { + if (!input_relation_info->isStoredRelation()) { CatalogRelation *catalog_relation = - const_cast(input_catalog_rel_it->second.relation); + const_cast(input_relation_info->relation); for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) { CatalogAttribute *catalog_attribute = catalog_relation->getAttributeByIdMutable(attr_idx); @@ -528,7 +578,7 @@ void ExecutionGenerator::convertSelection( project_expressions[attr_idx]->attribute_alias()); } physical_to_output_relation_map_.emplace(physical_selection, - input_catalog_rel_it->second); + *input_relation_info); return; } } @@ -560,10 +610,6 @@ void ExecutionGenerator::convertSelection( insert_destination_proto); // Create and add a Select operator. - const CatalogRelationInfo *input_relation_info = - findRelationInfoOutputByPhysical(physical_selection->input()); - DCHECK(input_relation_info != nullptr); - const CatalogRelation &input_relation = *input_relation_info->relation; const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme(); const std::size_t num_partitions = @@ -799,17 +845,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { S::QueryContext::HashTableContext *hash_table_context_proto = query_context_proto_->add_join_hash_tables(); - // No partition. - std::size_t num_partitions = 1; - if (build_relation->hasPartitionScheme() && - build_attribute_ids.size() == 1) { - const PartitionSchemeHeader &partition_scheme_header = - build_relation->getPartitionScheme()->getPartitionSchemeHeader(); - if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeIds().front()) { - // TODO(zuyu): add optimizer support for partitioned hash joins. - hash_table_context_proto->set_num_partitions(num_partitions); - } - } + const P::PartitionSchemeHeader *probe_partition_scheme_header = probe_physical->getOutputPartitionSchemeHeader(); + const std::size_t probe_num_partitions = + probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u; + hash_table_context_proto->set_num_partitions(probe_num_partitions); + S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table(); @@ -837,7 +877,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { build_relation_info->isStoredRelation(), build_attribute_ids, any_build_attributes_nullable, - num_partitions, + probe_num_partitions, join_hash_table_index)); // Create InsertDestination proto. @@ -880,7 +920,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { probe_operator_info->isStoredRelation(), probe_attribute_ids, any_probe_attributes_nullable, - num_partitions, + probe_num_partitions, *output_relation, insert_destination_index, join_hash_table_index, @@ -892,7 +932,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { const QueryPlan::DAGNodeIndex destroy_operator_index = execution_plan_->addRelationalOperator(new DestroyHashOperator( - query_handle_->query_id(), num_partitions, join_hash_table_index)); + query_handle_->query_id(), probe_num_partitions, join_hash_table_index)); if (!build_relation_info->isStoredRelation()) { execution_plan_->addDirectDependency(build_operator_index, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/BinaryJoin.cpp b/query_optimizer/physical/BinaryJoin.cpp index 30e2e8d..1928198 100644 --- a/query_optimizer/physical/BinaryJoin.cpp +++ b/query_optimizer/physical/BinaryJoin.cpp @@ -23,6 +23,8 @@ #include #include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" +#include "query_optimizer/physical/Physical.hpp" #include "utility/Cast.hpp" namespace quickstep { @@ -38,6 +40,11 @@ void BinaryJoin::getFieldStringItems( std::vector *non_container_child_fields, std::vector *container_child_field_names, std::vector> *container_child_fields) const { + if (partition_scheme_header_) { + inline_field_names->push_back("output_partition_scheme_header"); + inline_field_values->push_back(partition_scheme_header_->toString()); + } + non_container_child_field_names->push_back("left"); non_container_child_field_names->push_back("right"); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/BinaryJoin.hpp b/query_optimizer/physical/BinaryJoin.hpp index 4b5671d..d1f652f 100644 --- a/query_optimizer/physical/BinaryJoin.hpp +++ b/query_optimizer/physical/BinaryJoin.hpp @@ -41,6 +41,8 @@ namespace physical { class BinaryJoin; typedef std::shared_ptr BinaryJoinPtr; +struct PartitionSchemeHeader; + /** * @brief Base class for binary join nodes. */ @@ -68,11 +70,13 @@ class BinaryJoin : public Join { * @param left The left operand. * @param right The right operand. * @param project_expressions The project expressions. + * @param partition_scheme_header The optional output partition scheme header. */ BinaryJoin(const PhysicalPtr &left, const PhysicalPtr &right, - const std::vector &project_expressions) - : Join(project_expressions), + const std::vector &project_expressions, + PartitionSchemeHeader *partition_scheme_header = nullptr) + : Join(project_expressions, partition_scheme_header), left_(left), right_(right) { addChild(left_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt index 1e777b1..e0b1d25 100644 --- a/query_optimizer/physical/CMakeLists.txt +++ b/query_optimizer/physical/CMakeLists.txt @@ -67,6 +67,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_BinaryJoin quickstep_queryoptimizer_OptimizerTree quickstep_queryoptimizer_expressions_NamedExpression quickstep_queryoptimizer_physical_Join + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_Physical quickstep_utility_Cast quickstep_utility_Macros) @@ -202,9 +203,11 @@ target_link_libraries(quickstep_queryoptimizer_physical_PartitionSchemeHeader target_link_libraries(quickstep_queryoptimizer_physical_PatternMatcher quickstep_queryoptimizer_physical_PhysicalType) target_link_libraries(quickstep_queryoptimizer_physical_Physical + glog quickstep_queryoptimizer_OptimizerTree quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExpressionUtil + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_PhysicalType quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_physical_Sample @@ -225,6 +228,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_Selection quickstep_queryoptimizer_expressions_LogicalAnd quickstep_queryoptimizer_expressions_NamedExpression quickstep_queryoptimizer_expressions_Predicate + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType quickstep_utility_Cast @@ -254,6 +258,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExprId quickstep_queryoptimizer_expressions_ExpressionUtil + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType quickstep_utility_Cast @@ -261,10 +266,13 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator target_link_libraries(quickstep_queryoptimizer_physical_TableReference glog quickstep_catalog_CatalogRelation + quickstep_catalog_PartitionScheme + quickstep_catalog_PartitionSchemeHeader quickstep_queryoptimizer_OptimizerTree quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExpressionUtil quickstep_queryoptimizer_expressions_NamedExpression + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType quickstep_utility_Cast http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp index e186072..b0ee913 100644 --- a/query_optimizer/physical/HashJoin.cpp +++ b/query_optimizer/physical/HashJoin.cpp @@ -27,6 +27,7 @@ #include "query_optimizer/expressions/ExpressionUtil.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" #include "query_optimizer/expressions/Predicate.hpp" +#include "query_optimizer/physical/Physical.hpp" #include "utility/Cast.hpp" namespace quickstep { @@ -79,7 +80,8 @@ bool HashJoin::maybeCopyWithPrunedExpressions( right_join_attributes_, residual_predicate_, new_project_expressions, - join_type_); + join_type_, + cloneOutputPartitionSchemeHeader()); return true; } return false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp index c513f77..1804d4b 100644 --- a/query_optimizer/physical/HashJoin.hpp +++ b/query_optimizer/physical/HashJoin.hpp @@ -49,6 +49,8 @@ namespace physical { class HashJoin; typedef std::shared_ptr HashJoinPtr; +struct PartitionSchemeHeader; + /** * @brief Physical hash join node. */ @@ -116,11 +118,18 @@ class HashJoin : public BinaryJoin { right_join_attributes_, residual_predicate_, project_expressions(), - join_type_); + join_type_, + cloneOutputPartitionSchemeHeader()); } std::vector getReferencedAttributes() const override; + PhysicalPtr copyWithNewOutputPartitionSchemeHeader( + PartitionSchemeHeader *partition_scheme_header) const override { + return Create(left(), right(), left_join_attributes_, right_join_attributes_, + residual_predicate_, project_expressions(), join_type_, partition_scheme_header); + } + bool maybeCopyWithPrunedExpressions( const expressions::UnorderedNamedExpressionSet &referenced_expressions, PhysicalPtr *output) const override; @@ -136,6 +145,8 @@ class HashJoin : public BinaryJoin { * @param residual_predicate Optional filtering predicate evaluated after join. * @param project_expressions The project expressions. * @param Join type of this hash join. + * @param partition_scheme_header The optional output partition scheme header. + * * @return An immutable physical HashJoin. */ static HashJoinPtr Create( @@ -145,7 +156,8 @@ class HashJoin : public BinaryJoin { const std::vector &right_join_attributes, const expressions::PredicatePtr &residual_predicate, const std::vector &project_expressions, - const JoinType join_type) { + const JoinType join_type, + PartitionSchemeHeader *partition_scheme_header = nullptr) { return HashJoinPtr( new HashJoin(left, right, @@ -153,7 +165,8 @@ class HashJoin : public BinaryJoin { right_join_attributes, residual_predicate, project_expressions, - join_type)); + join_type, + partition_scheme_header)); } protected: @@ -173,8 +186,9 @@ class HashJoin : public BinaryJoin { const std::vector &right_join_attributes, const expressions::PredicatePtr &residual_predicate, const std::vector &project_expressions, - const JoinType join_type) - : BinaryJoin(left, right, project_expressions), + const JoinType join_type, + PartitionSchemeHeader *partition_scheme_header) + : BinaryJoin(left, right, project_expressions, partition_scheme_header), left_join_attributes_(left_join_attributes), right_join_attributes_(right_join_attributes), residual_predicate_(residual_predicate), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Join.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Join.hpp b/query_optimizer/physical/Join.hpp index 305aa52..d72d072 100644 --- a/query_optimizer/physical/Join.hpp +++ b/query_optimizer/physical/Join.hpp @@ -40,6 +40,8 @@ namespace physical { class Join; typedef std::shared_ptr JoinPtr; +struct PartitionSchemeHeader; + /** * @brief Base class for physical join nodes. */ @@ -68,10 +70,13 @@ class Join : public Physical { * @brief Constructor. * * @param project_expressions The project expressions. + * @param partition_scheme_header The optional output partition scheme header. */ explicit Join( - const std::vector& project_expressions) - : project_expressions_(project_expressions) {} + const std::vector& project_expressions, + PartitionSchemeHeader *partition_scheme_header = nullptr) + : Physical(partition_scheme_header), + project_expressions_(project_expressions) {} private: std::vector project_expressions_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Physical.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp index 4bed593..2279a84 100644 --- a/query_optimizer/physical/Physical.hpp +++ b/query_optimizer/physical/Physical.hpp @@ -26,10 +26,12 @@ #include "query_optimizer/OptimizerTree.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" #include "query_optimizer/physical/PhysicalType.hpp" - #include "utility/Macros.hpp" +#include "glog/logging.h" + namespace quickstep { namespace optimizer { namespace physical { @@ -86,11 +88,57 @@ class Physical : public OptimizerTree { const expressions::UnorderedNamedExpressionSet &referenced_expressions, PhysicalPtr *output) const = 0; + /** + * @brief Creates a copy with the partition scheme header replaced by \p + * partition_scheme_header. + * + * @param partition_scheme_header The partition scheme header to be + * substituted for the existing one, if any. It takes ownership of + * 'partition_scheme_header'. + * + * @return A copy with \p partition_scheme_header as the partition scheme + * header. + */ + virtual PhysicalPtr copyWithNewOutputPartitionSchemeHeader( + PartitionSchemeHeader *partition_scheme_header) const { + std::unique_ptr new_partition_scheme_header(partition_scheme_header); + LOG(FATAL) << "copyWithNewOutputPartitionSchemeHeader is not implemented for " << getName(); + } + + /** + * @brief Get the partition scheme of the physical plan node. + * + * @return A const pointer to the partition scheme of the node. + **/ + const PartitionSchemeHeader* getOutputPartitionSchemeHeader() const { + return partition_scheme_header_.get(); + } + protected: /** * @brief Constructor. + * + * @param partition_scheme_header The partition scheme header of the relation. + * The constructor takes ownership of 'partition_scheme_header'. */ - Physical() {} + explicit Physical(PartitionSchemeHeader *partition_scheme_header = nullptr) + : partition_scheme_header_(partition_scheme_header) {} + + /** + * @brief Clone a copy of the partition scheme header. + * + * @return A copy of the partition scheme header. Caller should take ownership + * of the returned object. + **/ + PartitionSchemeHeader* cloneOutputPartitionSchemeHeader() const { + if (partition_scheme_header_) { + return new PartitionSchemeHeader(*partition_scheme_header_); + } + + return nullptr; + } + + std::unique_ptr partition_scheme_header_; private: DISALLOW_COPY_AND_ASSIGN(Physical); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp index 36ade04..e2fa69e 100644 --- a/query_optimizer/physical/Selection.cpp +++ b/query_optimizer/physical/Selection.cpp @@ -19,27 +19,58 @@ #include "query_optimizer/physical/Selection.hpp" +#include #include +#include #include #include "query_optimizer/OptimizerTree.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExpressionUtil.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" +#include "query_optimizer/physical/Physical.hpp" #include "utility/Cast.hpp" #include "glog/logging.h" +using std::unordered_set; + namespace quickstep { namespace optimizer { namespace physical { namespace E = ::quickstep::optimizer::expressions; +SelectionPtr Selection::Create( + const PhysicalPtr &input, + const std::vector &project_expressions, + const E::PredicatePtr &filter_predicate, + PartitionSchemeHeader *output_partition_scheme_header) { + std::unique_ptr partition_scheme_header(output_partition_scheme_header); + + if (!partition_scheme_header) { + const PartitionSchemeHeader *input_partition_scheme_header = input->getOutputPartitionSchemeHeader(); + if (input_partition_scheme_header) { + unordered_set project_expr_ids; + for (const E::NamedExpressionPtr &project_expression : project_expressions) { + project_expr_ids.insert(project_expression->id()); + } + + if (input_partition_scheme_header->reusablePartitionScheme(project_expr_ids)) { + partition_scheme_header = std::make_unique(*input_partition_scheme_header); + } + } + } + + return SelectionPtr( + new Selection(input, project_expressions, filter_predicate, partition_scheme_header.release())); +} + PhysicalPtr Selection::copyWithNewChildren( const std::vector &new_children) const { DCHECK_EQ(children().size(), new_children.size()); - return Create(new_children[0], project_expressions_, filter_predicate_); + return Create(new_children[0], project_expressions_, filter_predicate_, cloneOutputPartitionSchemeHeader()); } std::vector Selection::getOutputAttributes() const { @@ -76,7 +107,7 @@ bool Selection::maybeCopyWithPrunedExpressions( } } if (new_project_expressions.size() != project_expressions_.size()) { - *output = Create(input(), new_project_expressions, filter_predicate_); + *output = Create(input(), new_project_expressions, filter_predicate_, cloneOutputPartitionSchemeHeader()); return true; } return false; @@ -89,6 +120,11 @@ void Selection::getFieldStringItems( std::vector *non_container_child_fields, std::vector *container_child_field_names, std::vector> *container_child_fields) const { + if (partition_scheme_header_) { + inline_field_names->push_back("output_partition_scheme_header"); + inline_field_values->push_back(partition_scheme_header_->toString()); + } + non_container_child_field_names->emplace_back("input"); non_container_child_fields->emplace_back(input()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp index b6874a1..204eb2f 100644 --- a/query_optimizer/physical/Selection.hpp +++ b/query_optimizer/physical/Selection.hpp @@ -42,6 +42,8 @@ namespace physical { * @{ */ +struct PartitionSchemeHeader; + class Selection; typedef std::shared_ptr SelectionPtr; @@ -82,6 +84,11 @@ class Selection : public Physical { std::vector getReferencedAttributes() const override; + PhysicalPtr copyWithNewOutputPartitionSchemeHeader( + PartitionSchemeHeader *partition_scheme_header) const override { + return Create(input(), project_expressions_, filter_predicate_, partition_scheme_header); + } + bool maybeCopyWithPrunedExpressions( const expressions::UnorderedNamedExpressionSet &referenced_attributes, PhysicalPtr *output) const override; @@ -92,15 +99,17 @@ class Selection : public Physical { * @param input The input node. * @param project_expressions The project expressions. * @param filter_predicate The filter predicate. Can be NULL. + * @param output_partition_scheme_header The partition scheme header that + * overwrites that from input, if not NULL. It takes ownership of + * 'output_partition_scheme_header'. + * * @return An immutable Selection. */ static SelectionPtr Create( const PhysicalPtr &input, const std::vector &project_expressions, - const expressions::PredicatePtr &filter_predicate) { - return SelectionPtr( - new Selection(input, project_expressions, filter_predicate)); - } + const expressions::PredicatePtr &filter_predicate, + PartitionSchemeHeader *output_partition_scheme_header = nullptr); /** * @brief Creates a conjunctive predicate with \p filter_predicates @@ -140,15 +149,17 @@ class Selection : public Physical { Selection( const PhysicalPtr &input, const std::vector &project_expressions, - const expressions::PredicatePtr &filter_predicate) - : project_expressions_(project_expressions), + const expressions::PredicatePtr &filter_predicate, + PartitionSchemeHeader *partition_scheme_header) + : Physical(partition_scheme_header), + project_expressions_(project_expressions), filter_predicate_(filter_predicate) { addChild(input); } - std::vector project_expressions_; + const std::vector project_expressions_; // Can be NULL. If NULL, the filter predicate is treated as the literal true. - expressions::PredicatePtr filter_predicate_; + const expressions::PredicatePtr filter_predicate_; DISALLOW_COPY_AND_ASSIGN(Selection); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/TableGenerator.hpp b/query_optimizer/physical/TableGenerator.hpp index c9ff8a8..4d6419f 100644 --- a/query_optimizer/physical/TableGenerator.hpp +++ b/query_optimizer/physical/TableGenerator.hpp @@ -29,6 +29,7 @@ #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" #include "utility/Cast.hpp" @@ -100,6 +101,12 @@ class TableGenerator : public Physical { return {}; } + PhysicalPtr copyWithNewOutputPartitionSchemeHeader( + PartitionSchemeHeader *partition_scheme_header) const override { + return TableGeneratorPtr( + new TableGenerator(generator_function_handle_, table_alias_, attribute_list_, partition_scheme_header)); + } + void getFieldStringItems( std::vector *inline_field_names, std::vector *inline_field_values, @@ -116,6 +123,11 @@ class TableGenerator : public Physical { inline_field_values->push_back(table_alias_); } + if (partition_scheme_header_) { + inline_field_names->push_back("output_partition_scheme_header"); + inline_field_values->push_back(partition_scheme_header_->toString()); + } + container_child_field_names->push_back(""); container_child_fields->push_back(CastSharedPtrVector(attribute_list_)); } @@ -139,8 +151,10 @@ class TableGenerator : public Physical { private: TableGenerator(const GeneratorFunctionHandlePtr &generator_function_handle, const std::string &table_alias, - const std::vector &attribute_list) - : generator_function_handle_(generator_function_handle), + const std::vector &attribute_list, + PartitionSchemeHeader *partition_scheme_header = nullptr) + : Physical(partition_scheme_header), + generator_function_handle_(generator_function_handle), table_alias_(table_alias), attribute_list_(attribute_list) { } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/TableReference.cpp b/query_optimizer/physical/TableReference.cpp index bfd6464..cd6bba4 100644 --- a/query_optimizer/physical/TableReference.cpp +++ b/query_optimizer/physical/TableReference.cpp @@ -19,11 +19,17 @@ #include "query_optimizer/physical/TableReference.hpp" +#include #include +#include #include #include "catalog/CatalogRelation.hpp" +#include "catalog/PartitionScheme.hpp" +#include "catalog/PartitionSchemeHeader.hpp" #include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" +#include "query_optimizer/physical/Physical.hpp" #include "utility/Cast.hpp" namespace quickstep { @@ -32,6 +38,50 @@ namespace physical { namespace E = ::quickstep::optimizer::expressions; +TableReferencePtr TableReference::Create( + const CatalogRelation *relation, + const std::string &alias, + const std::vector &attribute_list) { + std::unique_ptr output_partition_scheme_header; + const quickstep::PartitionScheme *partition_scheme = relation->getPartitionScheme(); + if (partition_scheme) { + const quickstep::PartitionSchemeHeader &partition_scheme_header = partition_scheme->getPartitionSchemeHeader(); + + PartitionSchemeHeader::PartitionType physical_partition_type; + switch (partition_scheme_header.getPartitionType()) { + case quickstep::PartitionSchemeHeader::PartitionType::kHash: + physical_partition_type = PartitionSchemeHeader::PartitionType::kHash; + break; + case quickstep::PartitionSchemeHeader::PartitionType::kRange: + physical_partition_type = PartitionSchemeHeader::PartitionType::kRange; + break; + default: + return nullptr; + } + + PartitionSchemeHeader::PartitionExprIds partition_expr_ids; + for (const attribute_id part_attr : partition_scheme_header.getPartitionAttributeIds()) { + partition_expr_ids.push_back({ attribute_list[part_attr]->id() }); + } + + output_partition_scheme_header = + std::make_unique(physical_partition_type, + partition_scheme_header.getNumPartitions(), + std::move(partition_expr_ids)); + } + + return TableReferencePtr(new TableReference(relation, alias, attribute_list, + output_partition_scheme_header.release())); +} + +PhysicalPtr TableReference::copyWithNewChildren( + const std::vector &new_children) const { + DCHECK_EQ(new_children.size(), children().size()); + std::unique_ptr output_partition_scheme_header; + return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_, + cloneOutputPartitionSchemeHeader())); +} + void TableReference::getFieldStringItems( std::vector *inline_field_names, std::vector *inline_field_values, @@ -47,6 +97,11 @@ void TableReference::getFieldStringItems( inline_field_values->push_back(alias_); } + if (partition_scheme_header_) { + inline_field_names->push_back("output_partition_scheme_header"); + inline_field_values->push_back(partition_scheme_header_->toString()); + } + container_child_field_names->push_back(""); container_child_fields->push_back(CastSharedPtrVector(attribute_list_)); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/TableReference.hpp b/query_optimizer/physical/TableReference.hpp index 638d73b..bdc0578 100644 --- a/query_optimizer/physical/TableReference.hpp +++ b/query_optimizer/physical/TableReference.hpp @@ -20,7 +20,6 @@ #ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_ #define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_ -#include #include #include @@ -45,6 +44,7 @@ namespace physical { * @{ */ +struct PartitionSchemeHeader; class TableReference; typedef std::shared_ptr TableReferencePtr; @@ -70,10 +70,7 @@ class TableReference : public Physical { } PhysicalPtr copyWithNewChildren( - const std::vector &new_children) const override { - DCHECK_EQ(new_children.size(), children().size()); - return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_)); - } + const std::vector &new_children) const override; std::vector getOutputAttributes() const override { return attribute_list_; @@ -101,9 +98,7 @@ class TableReference : public Physical { static TableReferencePtr Create( const CatalogRelation *relation, const std::string &alias, - const std::vector &attribute_list) { - return TableReferencePtr(new TableReference(relation, alias, attribute_list)); - } + const std::vector &attribute_list); protected: void getFieldStringItems( @@ -117,14 +112,16 @@ class TableReference : public Physical { private: TableReference( const CatalogRelation *relation, const std::string &alias, - const std::vector &attribute_list) - : relation_(relation), + const std::vector &attribute_list, + PartitionSchemeHeader *partition_scheme_header) + : Physical(partition_scheme_header), + relation_(relation), alias_(alias), attribute_list_(attribute_list) {} const CatalogRelation *relation_; - std::string alias_; - std::vector attribute_list_; + const std::string alias_; + const std::vector attribute_list_; DISALLOW_COPY_AND_ASSIGN(TableReference); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/rules/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt index 70302be..1ad8f40 100644 --- a/query_optimizer/rules/CMakeLists.txt +++ b/query_optimizer/rules/CMakeLists.txt @@ -391,5 +391,5 @@ target_link_libraries(quickstep_queryoptimizer_rules quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization quickstep_queryoptimizer_rules_SwapProbeBuild quickstep_queryoptimizer_rules_TopDownRule - quickstep_queryoptimizer_rules_UpdateExpression - quickstep_queryoptimizer_rules_UnnestSubqueries) + quickstep_queryoptimizer_rules_UnnestSubqueries + quickstep_queryoptimizer_rules_UpdateExpression) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index 70bb185..d3e7e08 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -463,6 +463,8 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const void HashInnerJoinWorkOrder::execute() { + output_destination_->setInputPartitionId(part_id_); + BlockReference probe_block( storage_manager_->getBlock(block_id_, probe_relation_)); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); @@ -694,6 +696,8 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso } void HashSemiJoinWorkOrder::execute() { + output_destination_->setInputPartitionId(part_id_); + if (residual_predicate_ == nullptr) { executeWithoutResidualPredicate(); } else { @@ -1018,6 +1022,8 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { } void HashOuterJoinWorkOrder::execute() { + output_destination_->setInputPartitionId(part_id_); + const relation_id build_relation_id = build_relation_.getID(); const relation_id probe_relation_id = probe_relation_.getID(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index 6391847..23267f8 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -35,6 +35,7 @@ #include "relational_operators/WorkOrder.hpp" #include "relational_operators/WorkOrder.pb.h" #include "storage/HashTable.hpp" +#include "storage/InsertDestination.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" @@ -48,7 +49,6 @@ namespace tmb { class MessageBus; } namespace quickstep { class CatalogRelationSchema; -class InsertDestination; class Predicate; class Scalar; class StorageManager; @@ -712,6 +712,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { ~HashAntiJoinWorkOrder() override {} void execute() override { + output_destination_->setInputPartitionId(part_id_); + if (residual_predicate_ == nullptr) { executeWithoutResidualPredicate(); } else { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp index 935b104..845c563 100644 --- a/relational_operators/SelectOperator.cpp +++ b/relational_operators/SelectOperator.cpp @@ -75,7 +75,7 @@ bool SelectOperator::getAllWorkOrders( } #endif // QUICKSTEP_HAVE_LIBNUMA container->addNormalWorkOrder( - new SelectWorkOrder(query_id_, input_relation_, input_block_id, predicate, simple_projection_, + new SelectWorkOrder(query_id_, input_relation_, part_id, input_block_id, predicate, simple_projection_, simple_selection_, selection, output_destination, storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), op_index_); @@ -95,7 +95,7 @@ bool SelectOperator::getAllWorkOrders( } #endif // QUICKSTEP_HAVE_LIBNUMA container->addNormalWorkOrder( - new SelectWorkOrder(query_id_, input_relation_, block, predicate, simple_projection_, + new SelectWorkOrder(query_id_, input_relation_, part_id, block, predicate, simple_projection_, simple_selection_, selection, output_destination, storage_manager, CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), op_index_); @@ -114,7 +114,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { for (const block_id input_block_id : input_relation_block_ids_[part_id]) { - container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); + container->addWorkOrderProto(createWorkOrderProto(part_id, input_block_id), op_index_); } } started_ = true; @@ -123,7 +123,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) { container->addWorkOrderProto( - createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]), + createWorkOrderProto(part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]), op_index_); ++num_workorders_generated_[part_id]; } @@ -132,7 +132,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) } } -serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id block) { +serialization::WorkOrder* SelectOperator::createWorkOrderProto(const partition_id part_id, const block_id block) { serialization::WorkOrder *proto = new serialization::WorkOrder; proto->set_work_order_type(serialization::SELECT); proto->set_query_id(query_id_); @@ -140,6 +140,7 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl proto->SetExtension(serialization::SelectWorkOrder::relation_id, input_relation_.getID()); proto->SetExtension(serialization::SelectWorkOrder::insert_destination_index, output_destination_index_); proto->SetExtension(serialization::SelectWorkOrder::predicate_index, predicate_index_); + proto->SetExtension(serialization::SelectWorkOrder::partition_id, part_id); proto->SetExtension(serialization::SelectWorkOrder::block_id, block); proto->SetExtension(serialization::SelectWorkOrder::simple_projection, simple_projection_); if (simple_projection_) { @@ -158,6 +159,8 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl } void SelectWorkOrder::execute() { + output_destination_->setInputPartitionId(part_id_); + BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0])); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index b8161b7..c11d681 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -226,9 +226,10 @@ class SelectOperator : public RelationalOperator { /** * @brief Create Work Order proto. * + * @param part_id The partition id. * @param block The block id used in the Work Order. **/ - serialization::WorkOrder* createWorkOrderProto(const block_id block); + serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block); const CatalogRelation &input_relation_; const CatalogRelation &output_relation_; @@ -268,6 +269,7 @@ class SelectWorkOrder : public WorkOrder { * * @param query_id The ID of the query to which this WorkOrder belongs. * @param input_relation The relation to perform selection over. + * @param part_id The partition id. * @param input_block_id The block id. * @param predicate All tuples matching \c predicate will be selected (or NULL * to select all tuples). @@ -283,6 +285,7 @@ class SelectWorkOrder : public WorkOrder { **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, + const partition_id part_id, const block_id input_block_id, const Predicate *predicate, const bool simple_projection, @@ -294,6 +297,7 @@ class SelectWorkOrder : public WorkOrder { const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), + part_id_(part_id), input_block_id_(input_block_id), predicate_(predicate), simple_projection_(simple_projection), @@ -313,6 +317,7 @@ class SelectWorkOrder : public WorkOrder { * * @param query_id The ID of the query to which this WorkOrder belongs. * @param input_relation The relation to perform selection over. + * @param part_id The partition id. * @param input_block_id The block id. * @param predicate All tuples matching \c predicate will be selected (or NULL * to select all tuples). @@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder { **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, + const partition_id part_id, const block_id input_block_id, const Predicate *predicate, const bool simple_projection, @@ -339,6 +345,7 @@ class SelectWorkOrder : public WorkOrder { const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), + part_id_(part_id), input_block_id_(input_block_id), predicate_(predicate), simple_projection_(simple_projection), @@ -364,6 +371,7 @@ class SelectWorkOrder : public WorkOrder { private: const CatalogRelationSchema &input_relation_; + const partition_id part_id_; const block_id input_block_id_; const Predicate *predicate_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 18f0589..42a0e7d 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -238,12 +238,14 @@ message SaveBlocksWorkOrder { } } +// Next tag: 250. message SelectWorkOrder { extend WorkOrder { // All required. optional int32 relation_id = 240; optional int32 insert_destination_index = 241; optional int32 predicate_index = 242; + optional uint64 partition_id = 249; optional fixed64 block_id = 243; optional bool simple_projection = 244; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 48bf956..2b0bae4 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -434,7 +434,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder storage_manager); } case serialization::SELECT: { - LOG(INFO) << "Creating SelectWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index; + const partition_id part_id = + proto.GetExtension(serialization::SelectWorkOrder::partition_id); + + LOG(INFO) << "Creating SelectWorkOrder (Partition " << part_id << ") for Query " << query_id + << " in Shiftboss " << shiftboss_index; + const bool simple_projection = proto.GetExtension(serialization::SelectWorkOrder::simple_projection); vector simple_selection; @@ -447,6 +452,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder query_id, catalog_database->getRelationSchemaById( proto.GetExtension(serialization::SelectWorkOrder::relation_id)), + part_id, proto.GetExtension(serialization::SelectWorkOrder::block_id), query_context->getPredicate( proto.GetExtension(serialization::SelectWorkOrder::predicate_index)), @@ -913,6 +919,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.HasExtension(serialization::SelectWorkOrder::predicate_index) && query_context.isValidPredicate( proto.GetExtension(serialization::SelectWorkOrder::predicate_index)) && + proto.HasExtension(serialization::SelectWorkOrder::partition_id) && proto.HasExtension(serialization::SelectWorkOrder::block_id); } case serialization::SORT_MERGE_RUN: { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 4296ba0..f33a4f4 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -776,6 +776,7 @@ target_link_libraries(quickstep_storage_InsertDestination quickstep_storage_StorageBlockLayout quickstep_storage_StorageManager quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorUtil quickstep_threading_SpinMutex quickstep_threading_ThreadIDBasedMap http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.cpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp index aeef08a..3caa80f 100644 --- a/storage/InsertDestination.cpp +++ b/storage/InsertDestination.cpp @@ -40,6 +40,7 @@ #include "storage/StorageBlockLayout.hpp" #include "storage/StorageManager.hpp" #include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" #include "threading/SpinMutex.hpp" #include "threading/ThreadIDBasedMap.hpp" @@ -573,11 +574,7 @@ PartitionSchemeHeader::PartitionAttributeIds PartitionAwareInsertDestination::ge } void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) { - PartitionSchemeHeader::PartitionValues values; - for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { - values.push_back(tuple.getAttributeValue(attr_id)); - } - const partition_id part_id = partition_scheme_header_->getPartitionId(values); + const partition_id part_id = getPartitionId(tuple); MutableBlockReference output_block = getBlockForInsertionInPartition(part_id); @@ -596,11 +593,7 @@ void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) { } void PartitionAwareInsertDestination::insertTupleInBatch(const Tuple &tuple) { - PartitionSchemeHeader::PartitionValues values; - for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { - values.push_back(tuple.getAttributeValue(attr_id)); - } - const partition_id part_id = partition_scheme_header_->getPartitionId(values); + const partition_id part_id = getPartitionId(tuple); MutableBlockReference output_block = getBlockForInsertionInPartition(part_id); @@ -637,12 +630,7 @@ void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor, // set a bit in the appropriate TupleIdSequence. accessor->beginIteration(); while (accessor->next()) { - PartitionSchemeHeader::PartitionValues values; - for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { - values.push_back(accessor->getTypedValue(attr_id)); - } - partition_membership[partition_scheme_header_->getPartitionId(values)] - ->set(accessor->getCurrentPosition(), true); + partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true); } // For each partition, create an adapter around Value Accessor and @@ -693,12 +681,7 @@ void PartitionAwareInsertDestination::bulkInsertTuplesWithRemappedAttributes( // set a bit in the appropriate TupleIdSequence. accessor->beginIteration(); while (accessor->next()) { - PartitionSchemeHeader::PartitionValues values; - for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { - values.push_back(accessor->getTypedValue(attr_id)); - } - partition_membership[partition_scheme_header_->getPartitionId(values)] - ->set(accessor->getCurrentPosition(), true); + partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true); } // For each partition, create an adapter around Value Accessor and @@ -735,12 +718,7 @@ void PartitionAwareInsertDestination::insertTuplesFromVector(std::vector: } for (; begin != end; ++begin) { - PartitionSchemeHeader::PartitionValues values; - for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { - values.push_back(begin->getAttributeValue(attr_id)); - } - - const partition_id part_id = partition_scheme_header_->getPartitionId(values); + const partition_id part_id = getPartitionId(*begin); MutableBlockReference dest_block = getBlockForInsertionInPartition(part_id); // FIXME(chasseur): Deal with TupleTooLargeForBlock exception. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index db58116..878ebb4 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -36,6 +36,7 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageBlockLayout.hpp" +#include "storage/ValueAccessor.hpp" #include "threading/SpinMutex.hpp" #include "threading/ThreadIDBasedMap.hpp" #include "types/containers/Tuple.hpp" @@ -53,7 +54,6 @@ namespace tmb { class MessageBus; } namespace quickstep { class StorageManager; -class ValueAccessor; namespace merge_run_operator { class RunCreator; @@ -201,6 +201,14 @@ class InsertDestination : public InsertDestinationInterface { virtual void getPartiallyFilledBlocks(std::vector *partial_blocks, std::vector *part_ids) = 0; + /** + * @brief Set the input partition id. Used when the partition attributes are + * empty. + * + * @param input_partition_id The input partition id. + **/ + virtual void setInputPartitionId(const partition_id input_partition_id) {} + protected: /** * @brief Get a block to use for insertion. @@ -541,6 +549,10 @@ class PartitionAwareInsertDestination : public InsertDestination { void insertTuplesFromVector(std::vector::const_iterator begin, std::vector::const_iterator end) override; + void setInputPartitionId(const partition_id input_partition_id) override { + input_partition_id_ = input_partition_id; + } + protected: MutableBlockReference getBlockForInsertion() override { LOG(FATAL) << "PartitionAwareInsertDestination::getBlockForInsertion needs a partition id as an argument."; @@ -599,6 +611,24 @@ class PartitionAwareInsertDestination : public InsertDestination { available_block_refs_[part_id].clear(); } + partition_id getPartitionId(const Tuple &tuple) const { + PartitionSchemeHeader::PartitionValues values; + for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { + values.push_back(tuple.getAttributeValue(attr_id)); + } + + return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values); + } + + partition_id getPartitionId(ValueAccessor *accessor) const { + PartitionSchemeHeader::PartitionValues values; + for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) { + values.push_back(accessor->getTypedValueVirtual(attr_id)); + } + + return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values); + } + std::unique_ptr partition_scheme_header_; // A vector of available block references for each partition. @@ -612,6 +642,8 @@ class PartitionAwareInsertDestination : public InsertDestination { // Mutex for locking each partition separately. SpinMutex *mutexes_for_partition_; + partition_id input_partition_id_ = 0u; + DISALLOW_COPY_AND_ASSIGN(PartitionAwareInsertDestination); }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index e1fb770..16a83ee 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -284,6 +284,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer quickstep_queryoptimizer_physical_FilterJoin quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_LIPFilterConfiguration + quickstep_queryoptimizer_physical_PartitionSchemeHeader quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_physical_PhysicalType quickstep_queryoptimizer_physical_TableReference http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/PlanVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp index f8bf6f8..98de011 100644 --- a/utility/PlanVisualizer.cpp +++ b/utility/PlanVisualizer.cpp @@ -34,6 +34,7 @@ #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/PartitionSchemeHeader.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" #include "query_optimizer/physical/TableReference.hpp" @@ -189,6 +190,11 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { } } + const P::PartitionSchemeHeader *partition_scheme_header = input->getOutputPartitionSchemeHeader(); + if (partition_scheme_header) { + node_info.labels.emplace_back(partition_scheme_header->toString()); + } + if (lip_filter_conf_ != nullptr) { const auto &build_filters = lip_filter_conf_->getBuildInfoMap(); const auto build_it = build_filters.find(input);