Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A8A6200CD8 for ; Wed, 2 Aug 2017 22:34:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2894C16A465; Wed, 2 Aug 2017 20:34:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9CA8116A45D for ; Wed, 2 Aug 2017 22:34:28 +0200 (CEST) Received: (qmail 31158 invoked by uid 500); 2 Aug 2017 20:34:27 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 31149 invoked by uid 99); 2 Aug 2017 20:34:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 20:34:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 3F6691A2486 for ; Wed, 2 Aug 2017 20:34:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ddts_JytKokr for ; Wed, 2 Aug 2017 20:34:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3EE665F5C4 for ; Wed, 2 Aug 2017 20:34:21 +0000 (UTC) Received: (qmail 31124 invoked by uid 99); 2 Aug 2017 20:34:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 20:34:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 29F31DFDDD; Wed, 2 Aug 2017 20:34:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <8f1342fa81c947e4963860c00e831266@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Determine #Partitions for Aggr State Hash Table in the optimizer. Date: Wed, 2 Aug 2017 20:34:20 +0000 (UTC) archived-at: Wed, 02 Aug 2017 20:34:30 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/master beda9cb84 -> 302f2cb88 Determine #Partitions for Aggr State Hash Table in the optimizer. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/302f2cb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/302f2cb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/302f2cb8 Branch: refs/heads/master Commit: 302f2cb88c11e06f87350117ad5b3a1398b828b6 Parents: beda9cb Author: Zuyu Zhang Authored: Thu Jul 20 15:33:54 2017 -0500 Committer: Zuyu Zhang Committed: Fri Jul 21 12:53:22 2017 -0500 ---------------------------------------------------------------------- query_optimizer/CMakeLists.txt | 1 + query_optimizer/ExecutionGenerator.cpp | 183 ++++++++++++++----- .../FinalizeAggregationOperator.cpp | 2 +- .../FinalizeAggregationOperator.hpp | 4 +- .../tests/AggregationOperator_unittest.cpp | 2 + storage/AggregationOperationState.cpp | 96 ++-------- storage/AggregationOperationState.hpp | 24 +-- storage/AggregationOperationState.proto | 3 + storage/CMakeLists.txt | 1 - storage/CollisionFreeVectorTable.cpp | 4 +- storage/CollisionFreeVectorTable.hpp | 14 +- storage/HashTableFactory.hpp | 7 +- 12 files changed, 181 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 564c5c8..69105e6 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -61,6 +61,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_catalog_CatalogTypedefs quickstep_catalog_PartitionScheme quickstep_catalog_PartitionSchemeHeader + quickstep_cli_Flags quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 4f6f807..88dc505 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -21,7 +21,9 @@ #include #include +#include #include +#include #include #include #include @@ -46,6 +48,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "catalog/PartitionScheme.hpp" #include "catalog/PartitionSchemeHeader.hpp" +#include "cli/Flags.hpp" #include "expressions/Expressions.pb.h" #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunction.pb.h" @@ -167,10 +170,83 @@ static const volatile bool aggregate_hashtable_type_dummy DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); +static bool ValidateNumAggregationPartitions(const char *flagname, int value) { + return value > 0; +} +DEFINE_int32(num_aggregation_partitions, + 41, + "The number of partitions in PartitionedHashTablePool used for " + "performing the aggregation"); +static const volatile bool num_aggregation_partitions_dummy + = gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions); + +DEFINE_uint64(partition_aggregation_num_groups_threshold, + 100000, + "The threshold used for deciding whether the aggregation is done " + "in a partitioned way or not"); + namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; namespace S = ::quickstep::serialization; +namespace { + +size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) { + // Set finalization segment size as 4096 entries. + constexpr size_t kFinalizeSegmentSize = 4uL * 1024L; + + // At least 1 partition, at most (#workers * 2) partitions. + return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, + static_cast(2 * FLAGS_num_workers))); +} + +bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions, + const std::vector &is_distincts, + const std::vector &group_by_attrs, + const std::size_t estimated_num_groups) { + // If there's no aggregation, return false. + if (num_aggregate_functions == 0) { + return false; + } + // If there is only only aggregate function, we allow distinct aggregation. + // Otherwise it can't be partitioned with distinct aggregation. + if (num_aggregate_functions > 1) { + for (const bool distinct : is_distincts) { + if (distinct) { + return false; + } + } + } + // There's no distinct aggregation involved, Check if there's at least one + // GROUP BY operation. + if (group_by_attrs.empty()) { + return false; + } + + // Currently we require that all the group-by keys are ScalarAttributes for + // the convenient of implementing copy elision. + // TODO(jianqiao): relax this requirement. + for (const attribute_id group_by_attr : group_by_attrs) { + if (group_by_attr == kInvalidAttributeID) { + return false; + } + } + + // Currently we always use partitioned aggregation to parallelize distinct + // aggregation. + const bool all_distinct = std::accumulate(is_distincts.begin(), is_distincts.end(), + !is_distincts.empty(), std::logical_and()); + if (all_distinct) { + return true; + } + + // There are GROUP BYs without DISTINCT. Check if the estimated number of + // groups is large enough to warrant a partitioned aggregation. + return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold; +} + +} // namespace + constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex; void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) { @@ -1618,8 +1694,8 @@ void ExecutionGenerator::convertAggregate( const P::AggregatePtr &physical_plan) { const CatalogRelationInfo *input_relation_info = findRelationInfoOutputByPhysical(physical_plan->input()); - const CatalogRelation *input_relation = input_relation_info->relation; - const PartitionScheme *input_partition_scheme = input_relation->getPartitionScheme(); + const CatalogRelation &input_relation = *input_relation_info->relation; + const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme(); const size_t num_partitions = input_partition_scheme ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions() @@ -1634,11 +1710,12 @@ void ExecutionGenerator::convertAggregate( S::AggregationOperationState *aggr_state_proto = aggr_state_context_proto->mutable_aggregation_state(); - aggr_state_proto->set_relation_id(input_relation->getID()); + aggr_state_proto->set_relation_id(input_relation.getID()); bool use_parallel_initialization = false; std::vector group_by_types; + std::vector group_by_attrs; for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { unique_ptr execution_group_by_expression; E::AliasPtr alias; @@ -1653,10 +1730,47 @@ void ExecutionGenerator::convertAggregate( execution_group_by_expression.reset( grouping_expression->concretize(attribute_substitution_map_)); } - aggr_state_proto->add_group_by_expressions()->CopyFrom(execution_group_by_expression->getProto()); + aggr_state_proto->add_group_by_expressions()->MergeFrom(execution_group_by_expression->getProto()); group_by_types.push_back(&execution_group_by_expression->getType()); + group_by_attrs.push_back(execution_group_by_expression->getAttributeIdForValueAccessor()); } + const auto &aggregate_expressions = physical_plan->aggregate_expressions(); + vector is_distincts; + for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) { + const E::AggregateFunctionPtr unnamed_aggregate_expression = + std::static_pointer_cast(named_aggregate_expression->expression()); + + // Add a new entry in 'aggregates'. + S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); + + // Set the AggregateFunction. + aggr_proto->mutable_function()->MergeFrom( + unnamed_aggregate_expression->getAggregate().getProto()); + + // Add each of the aggregate's arguments. + for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { + unique_ptr concretized_argument(argument->concretize(attribute_substitution_map_)); + aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto()); + } + + // Set whether it is a DISTINCT aggregation. + const bool is_distinct = unnamed_aggregate_expression->is_distinct(); + aggr_proto->set_is_distinct(is_distinct); + is_distincts.push_back(is_distinct); + + // Add distinctify hash table impl type if it is a DISTINCT aggregation. + if (unnamed_aggregate_expression->is_distinct()) { + const std::vector &arguments = unnamed_aggregate_expression->getArguments(); + DCHECK_GE(arguments.size(), 1u); + // Right now only SeparateChaining implementation is supported. + aggr_state_proto->add_distinctify_hash_table_impl_types( + serialization::HashTableImplType::SEPARATE_CHAINING); + } + } + + bool aggr_state_is_partitioned = false; + std::size_t aggr_state_num_partitions = 1u; if (!group_by_types.empty()) { const std::size_t estimated_num_groups = cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); @@ -1671,6 +1785,7 @@ void ExecutionGenerator::convertAggregate( serialization::HashTableImplType::COLLISION_FREE_VECTOR); aggr_state_proto->set_estimated_num_entries(max_num_groups); use_parallel_initialization = true; + aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups); } else { if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation( physical_plan, estimated_num_groups)) { @@ -1681,53 +1796,30 @@ void ExecutionGenerator::convertAggregate( // Otherwise, use SeparateChaining. aggr_state_proto->set_hash_table_impl_type( serialization::HashTableImplType::SEPARATE_CHAINING); + if (CheckAggregatePartitioned(aggregate_expressions.size(), is_distincts, group_by_attrs, + estimated_num_groups)) { + aggr_state_is_partitioned = true; + aggr_state_num_partitions = FLAGS_num_aggregation_partitions; + } } aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); } } else { aggr_state_proto->set_estimated_num_entries(1uL); } - - for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { - const E::AggregateFunctionPtr unnamed_aggregate_expression = - std::static_pointer_cast(named_aggregate_expression->expression()); - - // Add a new entry in 'aggregates'. - S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); - - // Set the AggregateFunction. - aggr_proto->mutable_function()->CopyFrom( - unnamed_aggregate_expression->getAggregate().getProto()); - - // Add each of the aggregate's arguments. - for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { - unique_ptr concretized_argument(argument->concretize(attribute_substitution_map_)); - aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto()); - } - - // Set whether it is a DISTINCT aggregation. - aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct()); - - // Add distinctify hash table impl type if it is a DISTINCT aggregation. - if (unnamed_aggregate_expression->is_distinct()) { - const std::vector &arguments = unnamed_aggregate_expression->getArguments(); - DCHECK_GE(arguments.size(), 1u); - // Right now only SeparateChaining implementation is supported. - aggr_state_proto->add_distinctify_hash_table_impl_types( - serialization::HashTableImplType::SEPARATE_CHAINING); - } - } + aggr_state_proto->set_is_partitioned(aggr_state_is_partitioned); + aggr_state_proto->set_num_partitions(aggr_state_num_partitions); if (physical_plan->filter_predicate() != nullptr) { unique_ptr predicate(convertPredicate(physical_plan->filter_predicate())); - aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto()); + aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto()); } const QueryPlan::DAGNodeIndex aggregation_operator_index = execution_plan_->addRelationalOperator( new AggregationOperator( query_handle_->query_id(), - *input_relation_info->relation, + input_relation, input_relation_info->isStoredRelation(), aggr_state_index, num_partitions)); @@ -1765,6 +1857,7 @@ void ExecutionGenerator::convertAggregate( new FinalizeAggregationOperator(query_handle_->query_id(), aggr_state_index, num_partitions, + aggr_state_num_partitions, *output_relation, insert_destination_index)); @@ -1827,18 +1920,23 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( std::unique_ptr execution_group_by_expression( physical_plan->right_join_attributes().front()->concretize( attribute_substitution_map_)); - aggr_state_proto->add_group_by_expressions()->CopyFrom( + aggr_state_proto->add_group_by_expressions()->MergeFrom( execution_group_by_expression->getProto()); aggr_state_proto->set_hash_table_impl_type( serialization::HashTableImplType::COLLISION_FREE_VECTOR); - aggr_state_proto->set_estimated_num_entries( - physical_plan->group_by_key_value_range()); + + const size_t estimated_num_entries = physical_plan->group_by_key_value_range(); + aggr_state_proto->set_estimated_num_entries(estimated_num_entries); + + const size_t aggr_state_num_partitions = + CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(estimated_num_entries); + aggr_state_proto->set_num_partitions(aggr_state_num_partitions); if (physical_plan->right_filter_predicate() != nullptr) { std::unique_ptr predicate( convertPredicate(physical_plan->right_filter_predicate())); - aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto()); + aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto()); } for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { @@ -1849,13 +1947,13 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); // Set the AggregateFunction. - aggr_proto->mutable_function()->CopyFrom( + aggr_proto->mutable_function()->MergeFrom( unnamed_aggregate_expression->getAggregate().getProto()); // Add each of the aggregate's arguments. for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { unique_ptr concretized_argument(argument->concretize(attribute_substitution_map_)); - aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto()); + aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto()); } // Set whether it is a DISTINCT aggregation. @@ -1926,6 +2024,7 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( new FinalizeAggregationOperator(query_handle_->query_id(), aggr_state_index, num_partitions, + aggr_state_num_partitions, *output_relation, insert_destination_index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 14db825..8283437 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -50,7 +50,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders( query_context->getAggregationState(aggr_state_index_, part_id); DCHECK(agg_state != nullptr); for (std::size_t state_part_id = 0; - state_part_id < agg_state->getNumFinalizationPartitions(); + state_part_id < aggr_state_num_partitions_; ++state_part_id) { container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index 5210de2..12433b9 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -69,11 +69,13 @@ class FinalizeAggregationOperator : public RelationalOperator { const std::size_t query_id, const QueryContext::aggregation_state_id aggr_state_index, const std::size_t num_partitions, + const std::size_t aggr_state_num_partitions, const CatalogRelation &output_relation, const QueryContext::insert_destination_id output_destination_index) : RelationalOperator(query_id), aggr_state_index_(aggr_state_index), num_partitions_(num_partitions), + aggr_state_num_partitions_(aggr_state_num_partitions), output_relation_(output_relation), output_destination_index_(output_destination_index), started_(false) {} @@ -106,7 +108,7 @@ class FinalizeAggregationOperator : public RelationalOperator { private: const QueryContext::aggregation_state_id aggr_state_index_; - const std::size_t num_partitions_; + const std::size_t num_partitions_, aggr_state_num_partitions_; const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_; bool started_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/relational_operators/tests/AggregationOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp index 0690b6b..3b4a737 100644 --- a/relational_operators/tests/AggregationOperator_unittest.cpp +++ b/relational_operators/tests/AggregationOperator_unittest.cpp @@ -293,6 +293,7 @@ class AggregationOperatorTest : public ::testing::Test { new FinalizeAggregationOperator(kQueryId, aggr_state_index, kNumPartitions, + kNumPartitions, *result_table_, insert_destination_index)); @@ -387,6 +388,7 @@ class AggregationOperatorTest : public ::testing::Test { new FinalizeAggregationOperator(kQueryId, aggr_state_index, kNumPartitions, + kNumPartitions, *result_table_, insert_destination_index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 0f4a105..3d1c14a 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -61,20 +61,10 @@ #include "utility/ColumnVectorCache.hpp" #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" -#include "gflags/gflags.h" - #include "glog/logging.h" namespace quickstep { -DEFINE_int32(num_aggregation_partitions, - 41, - "The number of partitions used for performing the aggregation"); -DEFINE_uint64(partition_aggregation_num_groups_threshold, - 100000, - "The threshold used for deciding whether the aggregation is done " - "in a partitioned way or not"); - AggregationOperationState::AggregationOperationState( const CatalogRelationSchema &input_relation, const std::vector &aggregate_functions, @@ -83,31 +73,21 @@ AggregationOperationState::AggregationOperationState( std::vector> &&group_by, const Predicate *predicate, const std::size_t estimated_num_entries, + const bool is_partitioned, + const std::size_t num_partitions, const HashTableImplType hash_table_impl_type, const std::vector &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), - is_aggregate_collision_free_(false), - is_aggregate_partitioned_(false), + is_aggregate_collision_free_( + group_by.empty() ? false + : hash_table_impl_type == HashTableImplType::kCollisionFreeVector), + is_aggregate_partitioned_(is_partitioned), predicate_(predicate), is_distinct_(std::move(is_distinct)), all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(), !is_distinct_.empty(), std::logical_and())), storage_manager_(storage_manager) { - if (!group_by.empty()) { - switch (hash_table_impl_type) { - case HashTableImplType::kCollisionFreeVector: - is_aggregate_collision_free_ = true; - break; - case HashTableImplType::kThreadPrivateCompactKey: - is_aggregate_partitioned_ = false; - break; - default: - is_aggregate_partitioned_ = checkAggregatePartitioned( - estimated_num_entries, is_distinct_, group_by, aggregate_functions); - } - } - // Sanity checks: each aggregate has a corresponding list of arguments. DCHECK(aggregate_functions.size() == arguments.size()); @@ -195,7 +175,7 @@ AggregationOperationState::AggregationOperationState( DCHECK(partitioned_group_by_hashtable_pool_ == nullptr); partitioned_group_by_hashtable_pool_.reset( new PartitionedHashTablePool(estimated_num_entries, - FLAGS_num_aggregation_partitions, + num_partitions, *distinctify_hash_table_impl_types_it, key_types, {}, @@ -227,7 +207,8 @@ AggregationOperationState::AggregationOperationState( group_by_types_, estimated_num_entries, group_by_handles, - storage_manager)); + storage_manager, + num_partitions)); } else if (is_aggregate_partitioned_) { if (all_distinct_) { DCHECK_EQ(1u, group_by_handles.size()); @@ -241,7 +222,7 @@ AggregationOperationState::AggregationOperationState( } else { partitioned_group_by_hashtable_pool_.reset( new PartitionedHashTablePool(estimated_num_entries, - FLAGS_num_aggregation_partitions, + num_partitions, hash_table_impl_type, group_by_types_, group_by_handles, @@ -315,6 +296,8 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( std::move(group_by_expressions), predicate.release(), proto.estimated_num_entries(), + proto.is_partitioned(), + proto.num_partitions(), HashTableImplTypeFromProto(proto.hash_table_impl_type()), distinctify_hash_table_impl_types, storage_manager); @@ -385,50 +368,6 @@ bool AggregationOperationState::ProtoIsValid( return true; } -bool AggregationOperationState::checkAggregatePartitioned( - const std::size_t estimated_num_groups, - const std::vector &is_distinct, - const std::vector> &group_by, - const std::vector &aggregate_functions) const { - // If there's no aggregation, return false. - if (aggregate_functions.empty()) { - return false; - } - // If there is only only aggregate function, we allow distinct aggregation. - // Otherwise it can't be partitioned with distinct aggregation. - if (aggregate_functions.size() > 1) { - for (auto distinct : is_distinct) { - if (distinct) { - return false; - } - } - } - // There's no distinct aggregation involved, Check if there's at least one - // GROUP BY operation. - if (group_by.empty()) { - return false; - } - - // Currently we require that all the group-by keys are ScalarAttributes for - // the convenient of implementing copy elision. - // TODO(jianqiao): relax this requirement. - for (const auto &group_by_element : group_by) { - if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) { - return false; - } - } - - // Currently we always use partitioned aggregation to parallelize distinct - // aggregation. - if (all_distinct_) { - return true; - } - - // There are GROUP BYs without DISTINCT. Check if the estimated number of - // groups is large enough to warrant a partitioned aggregation. - return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold; -} - std::size_t AggregationOperationState::getNumInitializationPartitions() const { if (is_aggregate_collision_free_) { return static_cast( @@ -438,17 +377,6 @@ std::size_t AggregationOperationState::getNumInitializationPartitions() const { } } -std::size_t AggregationOperationState::getNumFinalizationPartitions() const { - if (is_aggregate_collision_free_) { - return static_cast( - collision_free_hashtable_.get())->getNumFinalizationPartitions(); - } else if (is_aggregate_partitioned_) { - return partitioned_group_by_hashtable_pool_->getNumPartitions(); - } else { - return 1u; - } -} - CollisionFreeVectorTable* AggregationOperationState ::getCollisionFreeVectorTable() const { return static_cast( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 207c4f0..6174478 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -98,6 +98,9 @@ class AggregationOperationState { * @param estimated_num_entries Estimated of number of entries in the hash * table. A good estimate would be a fraction of total number of tuples * in the input relation. + * @param is_partitioned Whether this aggregation state is partitioned. + * @param num_partitions The number of partitions of the aggregation state + * hash table. * @param hash_table_impl_type The HashTable implementation to use for * GROUP BY. Ignored if group_by is empty. * @param distinctify_hash_table_impl_type The HashTable implementation to use @@ -114,6 +117,8 @@ class AggregationOperationState { std::vector> &&group_by, const Predicate *predicate, const std::size_t estimated_num_entries, + const bool is_partitioned, + const std::size_t num_partitions, const HashTableImplType hash_table_impl_type, const std::vector &distinctify_hash_table_impl_types, StorageManager *storage_manager); @@ -161,14 +166,6 @@ class AggregationOperationState { std::size_t getNumInitializationPartitions() const; /** - * @brief Get the number of partitions to be used for finalizing the - * aggregation. - * - * @return The number of partitions to be used for finalizing the aggregation. - **/ - std::size_t getNumFinalizationPartitions() const; - - /** * @brief Initialize the specified partition of this aggregation. * * @param partition_id ID of the partition to be initialized. @@ -213,13 +210,6 @@ class AggregationOperationState { std::size_t getMemoryConsumptionBytes() const; private: - // Check whether partitioned aggregation can be applied. - bool checkAggregatePartitioned( - const std::size_t estimated_num_groups, - const std::vector &is_distinct, - const std::vector> &group_by, - const std::vector &aggregate_functions) const; - // Aggregate on input block. void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux); @@ -271,10 +261,10 @@ class AggregationOperationState { const CatalogRelationSchema &input_relation_; // Whether the aggregation is collision free or not. - bool is_aggregate_collision_free_; + const bool is_aggregate_collision_free_; // Whether the aggregation is partitioned or not. - bool is_aggregate_partitioned_; + const bool is_aggregate_partitioned_; std::unique_ptr predicate_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/AggregationOperationState.proto ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto index 7521d73..1a8a302 100644 --- a/storage/AggregationOperationState.proto +++ b/storage/AggregationOperationState.proto @@ -42,4 +42,7 @@ message AggregationOperationState { // Each DISTINCT aggregation has its distinctify hash table impl type. repeated HashTableImplType distinctify_hash_table_impl_types = 7; + + optional bool is_partitioned = 8; + optional uint64 num_partitions = 9 [default = 1]; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index f33a4f4..3d90bc5 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -267,7 +267,6 @@ add_library(quickstep_storage_WindowAggregationOperationState_proto ${storage_Wi # Link dependencies: target_link_libraries(quickstep_storage_AggregationOperationState - ${GFLAGS_LIB_NAME} glog quickstep_catalog_CatalogDatabaseLite quickstep_catalog_CatalogRelationSchema http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp index d836014..679f77b 100644 --- a/storage/CollisionFreeVectorTable.cpp +++ b/storage/CollisionFreeVectorTable.cpp @@ -43,15 +43,17 @@ namespace quickstep { CollisionFreeVectorTable::CollisionFreeVectorTable( const Type *key_type, const std::size_t num_entries, + const std::size_t num_finalize_partitions, const std::vector &handles, StorageManager *storage_manager) : key_type_(key_type), num_entries_(num_entries), num_handles_(handles.size()), handles_(handles), - num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)), + num_finalize_partitions_(num_finalize_partitions), storage_manager_(storage_manager) { DCHECK_GT(num_entries, 0u); + DCHECK_GT(num_finalize_partitions_, 0u); std::size_t required_memory = 0; const std::size_t existence_map_offset = 0; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp index 221a221..5aeb5cf 100644 --- a/storage/CollisionFreeVectorTable.hpp +++ b/storage/CollisionFreeVectorTable.hpp @@ -58,6 +58,8 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { * * @param key_type The group-by key type. * @param num_entries The estimated number of entries this table will hold. + * @param num_finalize_partitions The number of partitions to be used for + * finalizing the aggregation. * @param handles The aggregation handles. * @param storage_manager The StorageManager to use (a StorageBlob will be * allocated to hold this table's contents). @@ -65,6 +67,7 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { CollisionFreeVectorTable( const Type *key_type, const std::size_t num_entries, + const std::size_t num_finalize_partitions, const std::vector &handles, StorageManager *storage_manager); @@ -193,17 +196,6 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL)); } - inline static std::size_t CalculateNumFinalizationPartitions( - const std::size_t num_entries) { - // Set finalization segment size as 4096 entries. - constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L; - - // At least 1 partition, at most 80 partitions. - // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the - // hardcoded 80. - return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL)); - } - inline std::size_t calculatePartitionLength() const { const std::size_t partition_length = (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/302f2cb8/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index cb1f16f..d367160 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -356,6 +356,8 @@ class AggregationStateHashTableFactory { * @param storage_manager The StorageManager to use (a StorageBlob will be * allocated to hold the hash table's contents). Forwarded as-is to the * hash table constructor. + * @param num_partitions The number of partitions of this aggregation state + * hash table. * @return A new aggregation state hash table. **/ static AggregationStateHashTableBase* CreateResizable( @@ -363,12 +365,13 @@ class AggregationStateHashTableFactory { const std::vector &key_types, const std::size_t num_entries, const std::vector &handles, - StorageManager *storage_manager) { + StorageManager *storage_manager, + const std::size_t num_partitions = 1u) { switch (hash_table_type) { case HashTableImplType::kCollisionFreeVector: DCHECK_EQ(1u, key_types.size()); return new CollisionFreeVectorTable( - key_types.front(), num_entries, handles, storage_manager); + key_types.front(), num_entries, num_partitions, handles, storage_manager); case HashTableImplType::kSeparateChaining: return new PackedPayloadHashTable( key_types, num_entries, handles, storage_manager);