Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00666200C23 for ; Wed, 8 Feb 2017 01:40:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F32F7160B3E; Wed, 8 Feb 2017 00:40:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7ADB5160B68 for ; Wed, 8 Feb 2017 01:40:16 +0100 (CET) Received: (qmail 66301 invoked by uid 500); 8 Feb 2017 00:40:15 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 66292 invoked by uid 99); 8 Feb 2017 00:40:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 00:40:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EC41EC0688 for ; Wed, 8 Feb 2017 00:40:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id jFmmo5kHllin for ; Wed, 8 Feb 2017 00:40:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 97EAB5FCD2 for ; Wed, 8 Feb 2017 00:40:06 +0000 (UTC) Received: (qmail 65964 invoked by uid 99); 8 Feb 2017 00:40:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 00:40:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35BABDFBDB; Wed, 8 Feb 2017 00:40:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 08 Feb 2017 00:40:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/12] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation. archived-at: Wed, 08 Feb 2017 00:40:19 -0000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index b942c1b..0b34908 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -20,7 +20,7 @@ #include "storage/AggregationOperationState.hpp" #include -#include +#include #include #include #include @@ -34,29 +34,32 @@ #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunctionFactory.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationHandleDistinct.hpp" -#include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "storage/AggregationOperationState.pb.h" -#include "storage/HashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "storage/CollisionFreeVectorTable.hpp" #include "storage/HashTableFactory.hpp" +#include "storage/HashTableBase.hpp" #include "storage/InsertDestination.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/SubBlocksReference.hpp" #include "storage/TupleIdSequence.hpp" +#include "storage/TupleStorageSubBlock.hpp" #include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/containers/Tuple.hpp" #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" -#include "glog/logging.h" +#include "gflags/gflags.h" -using std::unique_ptr; +#include "glog/logging.h" namespace quickstep { @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState( const std::vector &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), - is_aggregate_partitioned_(checkAggregatePartitioned( - estimated_num_entries, is_distinct, group_by, aggregate_functions)), + is_aggregate_collision_free_(false), + is_aggregate_partitioned_(false), predicate_(predicate), - group_by_list_(std::move(group_by)), - arguments_(std::move(arguments)), is_distinct_(std::move(is_distinct)), storage_manager_(storage_manager) { + if (!group_by.empty()) { + if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) { + is_aggregate_collision_free_ = true; + } else { + is_aggregate_partitioned_ = checkAggregatePartitioned( + estimated_num_entries, is_distinct_, group_by, aggregate_functions); + } + } + // Sanity checks: each aggregate has a corresponding list of arguments. - DCHECK(aggregate_functions.size() == arguments_.size()); + DCHECK(aggregate_functions.size() == arguments.size()); // Get the types of GROUP BY expressions for creating HashTables below. - std::vector group_by_types; - for (const std::unique_ptr &group_by_element : group_by_list_) { - group_by_types.emplace_back(&group_by_element->getType()); + for (const std::unique_ptr &group_by_element : group_by) { + group_by_types_.emplace_back(&group_by_element->getType()); + } + + // Prepare group-by key ids and non-trivial expressions. + for (std::unique_ptr &group_by_element : group_by) { + const attribute_id attr_id = + group_by_element->getAttributeIdForValueAccessor(); + if (attr_id != kInvalidAttributeID) { + group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id); + } else { + group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived, + non_trivial_expressions_.size()); + non_trivial_expressions_.emplace_back(group_by_element.release()); + } } std::vector group_by_handles; - group_by_handles.clear(); - - if (aggregate_functions.size() == 0) { - // If there is no aggregation function, then it is a distinctify operation - // on the group-by expressions. - DCHECK_GT(group_by_list_.size(), 0u); - - handles_.emplace_back(new AggregationHandleDistinct()); - arguments_.push_back({}); - is_distinct_.emplace_back(false); - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - {1}, - handles_, - storage_manager)); - } else { - // Set up each individual aggregate in this operation. - std::vector::const_iterator agg_func_it = - aggregate_functions.begin(); - std::vector>>::const_iterator - args_it = arguments_.begin(); - std::vector::const_iterator is_distinct_it = is_distinct_.begin(); - std::vector::const_iterator - distinctify_hash_table_impl_types_it = - distinctify_hash_table_impl_types.begin(); - std::vector payload_sizes; - for (; agg_func_it != aggregate_functions.end(); - ++agg_func_it, ++args_it, ++is_distinct_it) { - // Get the Types of this aggregate's arguments so that we can create an - // AggregationHandle. - std::vector argument_types; - for (const std::unique_ptr &argument : *args_it) { - argument_types.emplace_back(&argument->getType()); - } - // Sanity checks: aggregate function exists and can apply to the specified - // arguments. - DCHECK(*agg_func_it != nullptr); - DCHECK((*agg_func_it)->canApplyToTypes(argument_types)); - - // Have the AggregateFunction create an AggregationHandle that we can use - // to do actual aggregate computation. - handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); - - if (!group_by_list_.empty()) { - // Aggregation with GROUP BY: combined payload is partially updated in - // the presence of DISTINCT. - if (*is_distinct_it) { - handles_.back()->blockUpdate(); - } - group_by_handles.emplace_back(handles_.back()); - payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); + // Set up each individual aggregate in this operation. + std::vector::const_iterator agg_func_it = + aggregate_functions.begin(); + std::vector>>::iterator + args_it = arguments.begin(); + std::vector::const_iterator is_distinct_it = is_distinct_.begin(); + std::vector::const_iterator + distinctify_hash_table_impl_types_it = + distinctify_hash_table_impl_types.begin(); + for (; agg_func_it != aggregate_functions.end(); + ++agg_func_it, ++args_it, ++is_distinct_it) { + // Get the Types of this aggregate's arguments so that we can create an + // AggregationHandle. + std::vector argument_types; + for (const std::unique_ptr &argument : *args_it) { + argument_types.emplace_back(&argument->getType()); + } + + // Prepare argument attribute ids and non-trivial expressions. + std::vector argument_ids; + for (std::unique_ptr &argument : *args_it) { + const attribute_id attr_id = + argument->getAttributeIdForValueAccessor(); + if (attr_id != kInvalidAttributeID) { + argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id); } else { - // Aggregation without GROUP BY: create a single global state. - single_states_.emplace_back(handles_.back()->createInitialState()); - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // See if all of this aggregate's arguments are attributes in the input - // relation. If so, remember the attribute IDs so that we can do copy - // elision when actually performing the aggregation. - std::vector local_arguments_as_attributes; - local_arguments_as_attributes.reserve(args_it->size()); - for (const std::unique_ptr &argument : *args_it) { - const attribute_id argument_id = - argument->getAttributeIdForValueAccessor(); - if (argument_id == -1) { - local_arguments_as_attributes.clear(); - break; - } else { - DCHECK_EQ(input_relation_.getID(), - argument->getRelationIdForValueAccessor()); - local_arguments_as_attributes.push_back(argument_id); - } - } - - arguments_as_attributes_.emplace_back( - std::move(local_arguments_as_attributes)); -#endif + argument_ids.emplace_back(ValueAccessorSource::kDerived, + non_trivial_expressions_.size()); + non_trivial_expressions_.emplace_back(argument.release()); } + } + argument_ids_.emplace_back(std::move(argument_ids)); + + // Sanity checks: aggregate function exists and can apply to the specified + // arguments. + DCHECK(*agg_func_it != nullptr); + DCHECK((*agg_func_it)->canApplyToTypes(argument_types)); - // Initialize the corresponding distinctify hash table if this is a - // DISTINCT aggregation. + // Have the AggregateFunction create an AggregationHandle that we can use + // to do actual aggregate computation. + handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); + + if (!group_by_key_ids_.empty()) { + // Aggregation with GROUP BY: combined payload is partially updated in + // the presence of DISTINCT. if (*is_distinct_it) { - std::vector key_types(group_by_types); - key_types.insert( - key_types.end(), argument_types.begin(), argument_types.end()); - // TODO(jianqiao): estimated_num_entries is quite inaccurate for - // estimating the number of entries in the distinctify hash table. - // We may estimate for each distinct aggregation an - // estimated_num_distinct_keys value during query optimization, if it's - // worth. - distinctify_hashtables_.emplace_back( - AggregationStateFastHashTableFactory::CreateResizable( - *distinctify_hash_table_impl_types_it, - key_types, - estimated_num_entries, - {0}, - {}, - storage_manager)); - ++distinctify_hash_table_impl_types_it; - } else { - distinctify_hashtables_.emplace_back(nullptr); + handles_.back()->blockUpdate(); } + group_by_handles.emplace_back(handles_.back().get()); + } else { + // Aggregation without GROUP BY: create a single global state. + single_states_.emplace_back(handles_.back()->createInitialState()); } - if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool. - if (!is_aggregate_partitioned_) { - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager)); - } else { - partitioned_group_by_hashtable_pool_.reset( - new PartitionedHashTablePool(estimated_num_entries, - FLAGS_num_aggregation_partitions, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager)); - } + // Initialize the corresponding distinctify hash table if this is a + // DISTINCT aggregation. + if (*is_distinct_it) { + std::vector key_types(group_by_types_); + key_types.insert( + key_types.end(), argument_types.begin(), argument_types.end()); + // TODO(jianqiao): estimated_num_entries is quite inaccurate for + // estimating the number of entries in the distinctify hash table. + // We need to estimate for each distinct aggregation an + // estimated_num_distinct_keys value during query optimization. + distinctify_hashtables_.emplace_back( + AggregationStateHashTableFactory::CreateResizable( + *distinctify_hash_table_impl_types_it, + key_types, + estimated_num_entries, + {} /* handles */, + storage_manager)); + ++distinctify_hash_table_impl_types_it; + } else { + distinctify_hashtables_.emplace_back(nullptr); + } + } + + if (!group_by_key_ids_.empty()) { + // Aggregation with GROUP BY: create the hash table (pool). + if (is_aggregate_collision_free_) { + collision_free_hashtable_.reset( + AggregationStateHashTableFactory::CreateResizable( + hash_table_impl_type, + group_by_types_, + estimated_num_entries, + group_by_handles, + storage_manager)); + } else if (is_aggregate_partitioned_) { + partitioned_group_by_hashtable_pool_.reset( + new PartitionedHashTablePool(estimated_num_entries, + FLAGS_num_aggregation_partitions, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); + } else { + group_by_hashtable_pool_.reset( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); } } } @@ -269,7 +269,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( proto.group_by_expressions(group_by_idx), database)); } - unique_ptr predicate; + std::unique_ptr predicate; if (proto.has_predicate()) { predicate.reset( PredicateFactory::ReconstructFromProto(proto.predicate(), database)); @@ -353,153 +353,210 @@ bool AggregationOperationState::ProtoIsValid( return true; } -void AggregationOperationState::aggregateBlock(const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { - if (group_by_list_.empty()) { - aggregateBlockSingleState(input_block); - } else { - aggregateBlockHashTable(input_block, lip_filter_adaptive_prober); +bool AggregationOperationState::checkAggregatePartitioned( + const std::size_t estimated_num_groups, + const std::vector &is_distinct, + const std::vector> &group_by, + const std::vector &aggregate_functions) const { + // If there's no aggregation, return false. + if (aggregate_functions.empty()) { + return false; + } + // Check if there's a distinct operation involved in any aggregate, if so + // the aggregate can't be partitioned. + for (auto distinct : is_distinct) { + if (distinct) { + return false; + } + } + // There's no distinct aggregation involved, Check if there's at least one + // GROUP BY operation. + if (group_by.empty()) { + return false; + } + + // Currently we require that all the group-by keys are ScalarAttributes for + // the convenient of implementing copy elision. + // TODO(jianqiao): relax this requirement. + for (const auto &group_by_element : group_by) { + if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) { + return false; + } } + + // There are GROUP BYs without DISTINCT. Check if the estimated number of + // groups is large enough to warrant a partitioned aggregation. + return estimated_num_groups > + static_cast( + FLAGS_partition_aggregation_num_groups_threshold); + return false; } -void AggregationOperationState::finalizeAggregate( - InsertDestination *output_destination) { - if (group_by_list_.empty()) { - finalizeSingleState(output_destination); +std::size_t AggregationOperationState::getNumInitializationPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast( + collision_free_hashtable_.get())->getNumInitializationPartitions(); } else { - finalizeHashTable(output_destination); + return 0u; } } -void AggregationOperationState::mergeSingleState( - const std::vector> &local_state) { - DEBUG_ASSERT(local_state.size() == single_states_.size()); - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (!is_distinct_[agg_idx]) { - handles_[agg_idx]->mergeStates(*local_state[agg_idx], - single_states_[agg_idx].get()); - } +std::size_t AggregationOperationState::getNumFinalizationPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast( + collision_free_hashtable_.get())->getNumFinalizationPartitions(); + } else if (is_aggregate_partitioned_) { + return partitioned_group_by_hashtable_pool_->getNumPartitions(); + } else { + return 1u; } } -void AggregationOperationState::aggregateBlockSingleState( - const block_id input_block) { - // Aggregate per-block state for each aggregate. - std::vector> local_state; +void AggregationOperationState::initialize(const std::size_t partition_id) { + if (is_aggregate_collision_free_) { + static_cast( + collision_free_hashtable_.get())->initialize(partition_id); + } else { + LOG(FATAL) << "AggregationOperationState::initializeState() " + << "is not supported by this aggregation"; + } +} +void AggregationOperationState::aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); + const auto &tuple_store = block->getTupleStorageSubBlock(); + std::unique_ptr base_accessor(tuple_store.createValueAccessor()); + std::unique_ptr shared_accessor; + ValueAccessor *accessor = base_accessor.get(); + // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence + // as the existence map for the tuples. std::unique_ptr matches; if (predicate_ != nullptr) { - std::unique_ptr accessor( - block->getTupleStorageSubBlock().createValueAccessor()); - matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get())); + matches.reset(block->getMatchesForPredicate(predicate_.get())); + shared_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + accessor = shared_accessor.get(); + } + if (lip_filter_adaptive_prober != nullptr) { + matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor)); + shared_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + accessor = shared_accessor.get(); } - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - const std::vector *local_arguments_as_attributes = nullptr; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all arguments are attributes of the input relation, elide a copy. - if (!arguments_as_attributes_[agg_idx].empty()) { - local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]); + std::unique_ptr non_trivial_results; + if (!non_trivial_expressions_.empty()) { + non_trivial_results.reset(new ColumnVectorsValueAccessor()); + SubBlocksReference sub_blocks_ref(tuple_store, + block->getIndices(), + block->getIndicesConsistent()); + for (const auto &expression : non_trivial_expressions_) { + non_trivial_results->addColumn( + expression->getAllValues(accessor, &sub_blocks_ref)); } -#endif + } + + accessor->beginIterationVirtual(); + + ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get()); + if (group_by_key_ids_.empty()) { + aggregateBlockSingleState(accessor_mux); + } else { + aggregateBlockHashTable(accessor_mux); + } +} + +void AggregationOperationState::aggregateBlockSingleState( + const ValueAccessorMultiplexer &accessor_mux) { + // Aggregate per-block state for each aggregate. + std::vector> local_state; + + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + const auto &argument_ids = argument_ids_[agg_idx]; + const auto &handle = handles_[agg_idx]; + + AggregationState *state = nullptr; if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to put the arguments as keys - // directly into the (threadsafe) shared global distinctify HashTable - // for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - {}, /* group_by */ - matches.get(), - distinctify_hashtables_[agg_idx].get(), - nullptr /* reuse_group_by_vectors */); - local_state.emplace_back(nullptr); + handle->insertValueAccessorIntoDistinctifyHashTable( + argument_ids, + {}, + accessor_mux, + distinctify_hashtables_[agg_idx].get()); } else { - // Call StorageBlock::aggregate() to actually do the aggregation. - local_state.emplace_back(block->aggregate(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - matches.get())); + if (argument_ids.empty()) { + // Special case. This is a nullary aggregate (i.e. COUNT(*)). + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + DCHECK(base_accessor != nullptr); + state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual()); + } else { + // Have the AggregationHandle actually do the aggregation. + state = handle->accumulateValueAccessor(argument_ids, accessor_mux); + } } + local_state.emplace_back(state); } // Merge per-block aggregation states back with global state. mergeSingleState(local_state); } -void AggregationOperationState::aggregateBlockHashTable( - const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { - BlockReference block( - storage_manager_->getBlock(input_block, input_relation_)); - - // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence - // as the existence map for the tuples. - std::unique_ptr matches; - if (predicate_ != nullptr) { - matches.reset(block->getMatchesForPredicate(predicate_.get())); - } - if (lip_filter_adaptive_prober != nullptr) { - std::unique_ptr accessor( - block->getTupleStorageSubBlock().createValueAccessor(matches.get())); - matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get())); - } - - // This holds values of all the GROUP BY attributes so that the can be reused - // across multiple aggregates (i.e. we only pay the cost of evaluatin the - // GROUP BY expressions once). - std::vector> reuse_group_by_vectors; - +void AggregationOperationState::mergeSingleState( + const std::vector> &local_state) { + DCHECK_EQ(local_state.size(), single_states_.size()); for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY - // expression - // values and the aggregation arguments together as keys directly into the - // (threadsafe) shared global distinctify HashTable for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - nullptr, /* arguments_as_attributes */ - group_by_list_, - matches.get(), - distinctify_hashtables_[agg_idx].get(), - &reuse_group_by_vectors); + if (!is_distinct_[agg_idx]) { + handles_[agg_idx]->mergeStates(*local_state[agg_idx], + single_states_[agg_idx].get()); } } +} + +void AggregationOperationState::mergeGroupByHashTables( + AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst) const { + HashTableMerger merger(static_cast(dst)); + static_cast(src)->forEachCompositeKey(&merger); +} - if (!is_aggregate_partitioned_) { - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pool_ != nullptr); - AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(arguments_, - group_by_list_, - matches.get(), - agg_hash_table, - &reuse_group_by_vectors); - group_by_hashtable_pool_->returnHashTable(agg_hash_table); +void AggregationOperationState::aggregateBlockHashTable( + const ValueAccessorMultiplexer &accessor_mux) { + if (is_aggregate_collision_free_) { + aggregateBlockHashTableImplCollisionFree(accessor_mux); + } else if (is_aggregate_partitioned_) { + aggregateBlockHashTableImplPartitioned(accessor_mux); } else { - ColumnVectorsValueAccessor temp_result; - // IDs of 'arguments' as attributes in the ValueAccessor we create below. - std::vector argument_ids; + aggregateBlockHashTableImplThreadPrivate(accessor_mux); + } +} + +void AggregationOperationState::aggregateBlockHashTableImplCollisionFree( + const ValueAccessorMultiplexer &accessor_mux) { + DCHECK(collision_free_hashtable_ != nullptr); + + collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); +} + +void AggregationOperationState::aggregateBlockHashTableImplPartitioned( + const ValueAccessorMultiplexer &accessor_mux) { + DCHECK(partitioned_group_by_hashtable_pool_ != nullptr); + + std::vector group_by_key_ids; + for (const MultiSourceAttributeId &key_id : group_by_key_ids_) { + DCHECK(key_id.source == ValueAccessorSource::kBase); + group_by_key_ids.emplace_back(key_id.attr_id); + } - // IDs of GROUP BY key element(s) in the ValueAccessor we create below. - std::vector key_ids; + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + accessor_mux.getBaseAccessor(), + [&](auto *accessor) -> void { // NOLINT(build/c++11) + // TODO(jianqiao): handle the situation when keys in non_trivial_results const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions(); - block->aggregateGroupByPartitioned( - arguments_, - group_by_list_, - matches.get(), - num_partitions, - &temp_result, - &argument_ids, - &key_ids, - &reuse_group_by_vectors); + // Compute the partitions for the tuple formed by group by values. std::vector> partition_membership; partition_membership.resize(num_partitions); @@ -507,32 +564,74 @@ void AggregationOperationState::aggregateBlockHashTable( // Create a tuple-id sequence for each partition. for (std::size_t partition = 0; partition < num_partitions; ++partition) { partition_membership[partition].reset( - new TupleIdSequence(temp_result.getEndPosition())); + new TupleIdSequence(accessor->getEndPosition())); } // Iterate over ValueAccessor for each tuple, // set a bit in the appropriate TupleIdSequence. - temp_result.beginIteration(); - while (temp_result.next()) { + while (accessor->next()) { // We need a unique_ptr because getTupleWithAttributes() uses "new". - std::unique_ptr curr_tuple(temp_result.getTupleWithAttributes(key_ids)); + std::unique_ptr curr_tuple( + accessor->getTupleWithAttributes(group_by_key_ids)); const std::size_t curr_tuple_partition_id = curr_tuple->getTupleHash() % num_partitions; partition_membership[curr_tuple_partition_id]->set( - temp_result.getCurrentPosition(), true); + accessor->getCurrentPosition(), true); } - // For each partition, create an adapter around Value Accessor and - // TupleIdSequence. - std::vector>> adapter; - adapter.resize(num_partitions); + + // Aggregate each partition. for (std::size_t partition = 0; partition < num_partitions; ++partition) { - adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter( - *(partition_membership)[partition])); + std::unique_ptr base_adapter( + accessor->createSharedTupleIdSequenceAdapter( + *partition_membership[partition])); + + std::unique_ptr derived_adapter; + if (accessor_mux.getDerivedAccessor() != nullptr) { + derived_adapter.reset( + accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual( + *partition_membership[partition])); + } + + ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get()); partitioned_group_by_hashtable_pool_->getHashTable(partition) - ->upsertValueAccessorCompositeKeyFast( - argument_ids, adapter[partition].get(), key_ids, true); + ->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + local_mux); } + }); +} + +void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate( + const ValueAccessorMultiplexer &accessor_mux) { + DCHECK(group_by_hashtable_pool_ != nullptr); + + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + if (is_distinct_[agg_idx]) { + handles_[agg_idx]->insertValueAccessorIntoDistinctifyHashTable( + argument_ids_[agg_idx], + group_by_key_ids_, + accessor_mux, + distinctify_hashtables_[agg_idx].get()); + } + } + + AggregationStateHashTableBase *agg_hash_table = + group_by_hashtable_pool_->getHashTable(); + + agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); +} + +void AggregationOperationState::finalizeAggregate( + const std::size_t partition_id, + InsertDestination *output_destination) { + if (group_by_key_ids_.empty()) { + DCHECK_EQ(0u, partition_id); + finalizeSingleState(output_destination); + } else { + finalizeHashTable(partition_id, output_destination); } } @@ -556,80 +655,83 @@ void AggregationOperationState::finalizeSingleState( output_destination->insertTuple(Tuple(std::move(attribute_values))); } -void AggregationOperationState::mergeGroupByHashTables( - AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) { - HashTableMergerFast merger(dst); - (static_cast *>(src)) - ->forEachCompositeKeyFast(&merger); +void AggregationOperationState::finalizeHashTable( + const std::size_t partition_id, + InsertDestination *output_destination) { + if (is_aggregate_collision_free_) { + finalizeHashTableImplCollisionFree(partition_id, output_destination); + } else if (is_aggregate_partitioned_) { + finalizeHashTableImplPartitioned(partition_id, output_destination); + } else { + DCHECK_EQ(0u, partition_id); + finalizeHashTableImplThreadPrivate(output_destination); + } } -void AggregationOperationState::finalizeHashTable( +void AggregationOperationState::finalizeHashTableImplCollisionFree( + const std::size_t partition_id, + InsertDestination *output_destination) { + std::vector> final_values; + CollisionFreeVectorTable *hash_table = + static_cast(collision_free_hashtable_.get()); + + const std::size_t max_length = + hash_table->getNumTuplesInFinalizationPartition(partition_id); + ColumnVectorsValueAccessor complete_result; + + DCHECK_EQ(1u, group_by_types_.size()); + const Type *key_type = group_by_types_.front(); + DCHECK(NativeColumnVector::UsableForType(*key_type)); + + std::unique_ptr key_cv( + std::make_unique(*key_type, max_length)); + hash_table->finalizeKey(partition_id, key_cv.get()); + complete_result.addColumn(key_cv.release()); + + for (std::size_t i = 0; i < handles_.size(); ++i) { + const Type *result_type = handles_[i]->getResultType(); + DCHECK(NativeColumnVector::UsableForType(*result_type)); + + std::unique_ptr result_cv( + std::make_unique(*result_type, max_length)); + hash_table->finalizeState(partition_id, i, result_cv.get()); + complete_result.addColumn(result_cv.release()); + } + + // Bulk-insert the complete result. + output_destination->bulkInsertTuples(&complete_result); +} + +void AggregationOperationState::finalizeHashTableImplPartitioned( + const std::size_t partition_id, InsertDestination *output_destination) { + PackedPayloadHashTable *hash_table = + static_cast( + partitioned_group_by_hashtable_pool_->getHashTable(partition_id)); + // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector> group_by_keys; - // TODO(harshad) - The merge phase may be slower when each hash table contains - // large number of entries. We should find ways in which we can perform a - // parallel merge. + if (handles_.empty()) { + const auto keys_retriever = [&group_by_keys](std::vector &group_by_key, + const std::uint8_t *dumb_placeholder) -> void { + group_by_keys.emplace_back(std::move(group_by_key)); + }; - // TODO(harshad) - Find heuristics for faster merge, even in a single thread. - // e.g. Keep merging entries from smaller hash tables to larger. - - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - if (hash_tables->size() > 1) { - for (int hash_table_index = 0; - hash_table_index < static_cast(hash_tables->size() - 1); - ++hash_table_index) { - // Merge each hash table to the last hash table. - mergeGroupByHashTables((*hash_tables)[hash_table_index].get(), - hash_tables->back().get()); - } + hash_table->forEachCompositeKey(&keys_retriever); } // Collect per-aggregate finalized values. std::vector> final_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pool_ != nullptr); - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - DCHECK(hash_tables->back() != nullptr); - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); - handles_[agg_idx]->allowUpdate(); - handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( - *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); - } - - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *agg_hash_table, &group_by_keys, agg_idx); + *hash_table, agg_idx, &group_by_keys); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } } + hash_table->destroyPayload(); // Reorganize 'group_by_keys' in column-major order so that we can make a // ColumnVectorsValueAccessor to bulk-insert results. @@ -640,23 +742,20 @@ void AggregationOperationState::finalizeHashTable( // in a single HashTable. std::vector> group_by_cvs; std::size_t group_by_element_idx = 0; - for (const std::unique_ptr &group_by_element : group_by_list_) { - const Type &group_by_type = group_by_element->getType(); - if (NativeColumnVector::UsableForType(group_by_type)) { + for (const Type *group_by_type : group_by_types_) { + if (NativeColumnVector::UsableForType(*group_by_type)) { NativeColumnVector *element_cv = - new NativeColumnVector(group_by_type, group_by_keys.size()); + new NativeColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); } } else { IndirectColumnVector *element_cv = - new IndirectColumnVector(group_by_type, group_by_keys.size()); + new IndirectColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx; @@ -676,42 +775,64 @@ void AggregationOperationState::finalizeHashTable( output_destination->bulkInsertTuples(&complete_result); } -void AggregationOperationState::destroyAggregationHashTablePayload() { - std::vector> *all_hash_tables = - nullptr; - if (!is_aggregate_partitioned_) { - if (group_by_hashtable_pool_ != nullptr) { - all_hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - } else { - if (partitioned_group_by_hashtable_pool_ != nullptr) { - all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables(); - } +void AggregationOperationState::finalizeHashTableImplThreadPrivate( + InsertDestination *output_destination) { + // TODO(harshad) - The merge phase may be slower when each hash table contains + // large number of entries. We should find ways in which we can perform a + // parallel merge. + + // TODO(harshad) - Find heuristics for faster merge, even in a single thread. + // e.g. Keep merging entries from smaller hash tables to larger. + + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); + DCHECK(hash_tables != nullptr); + if (hash_tables->empty()) { + return; } - if (all_hash_tables != nullptr) { - for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) { - (*all_hash_tables)[ht_index]->destroyPayload(); - } + + std::unique_ptr final_hash_table_ptr( + hash_tables->back().release()); + for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { + std::unique_ptr hash_table( + hash_tables->at(i).release()); + mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get()); + hash_table->destroyPayload(); } -} -void AggregationOperationState::finalizeAggregatePartitioned( - const std::size_t partition_id, InsertDestination *output_destination) { + PackedPayloadHashTable *final_hash_table = + static_cast(final_hash_table_ptr.get()); + // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector> group_by_keys; + if (handles_.empty()) { + const auto keys_retriever = [&group_by_keys](std::vector &group_by_key, + const std::uint8_t *dumb_placeholder) -> void { + group_by_keys.emplace_back(std::move(group_by_key)); + }; + + final_hash_table->forEachCompositeKey(&keys_retriever); + } + + // Collect per-aggregate finalized values. std::vector> final_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - AggregationStateHashTableBase *hash_table = - partitioned_group_by_hashtable_pool_->getHashTable(partition_id); - ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *hash_table, &group_by_keys, agg_idx); - if (agg_result_col != nullptr) { - final_values.emplace_back(agg_result_col); + if (is_distinct_[agg_idx]) { + handles_[agg_idx]->allowUpdate(); + handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( + *distinctify_hashtables_[agg_idx], agg_idx, final_hash_table); } + + ColumnVector *agg_result_col = + handles_[agg_idx]->finalizeHashTable( + *final_hash_table, agg_idx, &group_by_keys); + DCHECK(agg_result_col != nullptr); + + final_values.emplace_back(agg_result_col); } + final_hash_table->destroyPayload(); // Reorganize 'group_by_keys' in column-major order so that we can make a // ColumnVectorsValueAccessor to bulk-insert results. @@ -722,19 +843,22 @@ void AggregationOperationState::finalizeAggregatePartitioned( // in a single HashTable. std::vector> group_by_cvs; std::size_t group_by_element_idx = 0; - for (const std::unique_ptr &group_by_element : group_by_list_) { - const Type &group_by_type = group_by_element->getType(); - if (NativeColumnVector::UsableForType(group_by_type)) { - NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + for (const Type *group_by_type : group_by_types_) { + if (NativeColumnVector::UsableForType(*group_by_type)) { + NativeColumnVector *element_cv = + new NativeColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } else { - IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + IndirectColumnVector *element_cv = + new IndirectColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 591e3a1..13ee377 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -24,31 +24,27 @@ #include #include -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" -#include "storage/AggregationOperationState.pb.h" #include "storage/HashTableBase.hpp" #include "storage/HashTablePool.hpp" #include "storage/PartitionedHashTablePool.hpp" #include "storage/StorageBlockInfo.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "utility/Macros.hpp" -#include "gflags/gflags.h" - namespace quickstep { +namespace serialization { class AggregationOperationState; } + class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; class InsertDestination; class LIPFilterAdaptiveProber; class StorageManager; - -DECLARE_int32(num_aggregation_partitions); -DECLARE_int32(partition_aggregation_num_groups_threshold); +class Type; /** \addtogroup Storage * @{ @@ -156,6 +152,29 @@ class AggregationOperationState { const CatalogDatabaseLite &database); /** + * @brief Get the number of partitions to be used for initializing the + * aggregation. + * + * @return The number of partitions to be used for initializing the aggregation. + **/ + std::size_t getNumInitializationPartitions() const; + + /** + * @brief Get the number of partitions to be used for finalizing the + * aggregation. + * + * @return The number of partitions to be used for finalizing the aggregation. + **/ + std::size_t getNumFinalizationPartitions() const; + + /** + * @brief Initialize the specified partition of this aggregation. + * + * @param partition_id ID of the partition to be initialized. + */ + void initialize(const std::size_t partition_id); + + /** * @brief Compute aggregates on the tuples of the given storage block, * updating the running state maintained by this * AggregationOperationState. @@ -166,127 +185,95 @@ class AggregationOperationState { * the block. **/ void aggregateBlock(const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober); + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr); /** * @brief Generate the final results for the aggregates managed by this * AggregationOperationState and write them out to StorageBlock(s). * + * @param partition_id The partition id of this finalize operation. * @param output_destination An InsertDestination where the finalized output * tuple(s) from this aggregate are to be written. **/ - void finalizeAggregate(InsertDestination *output_destination); - - /** - * @brief Destroy the payloads in the aggregation hash tables. - **/ - void destroyAggregationHashTablePayload(); - - /** - * @brief Generate the final results for the aggregates managed by this - * AggregationOperationState and write them out to StorageBlock(s). - * In this implementation, each thread picks a hash table belonging to - * a partition and writes its values to StorageBlock(s). There is no - * need to merge multiple hash tables in one, because there is no - * overlap in the keys across two hash tables. - * - * @param partition_id The ID of the partition for which finalize is being - * performed. - * @param output_destination An InsertDestination where the finalized output - * tuple(s) from this aggregate are to be written. - **/ - void finalizeAggregatePartitioned( - const std::size_t partition_id, InsertDestination *output_destination); - - static void mergeGroupByHashTables(AggregationStateHashTableBase *src, - AggregationStateHashTableBase *dst); + void finalizeAggregate(const std::size_t partition_id, + InsertDestination *output_destination); - bool isAggregatePartitioned() const { - return is_aggregate_partitioned_; - } + private: + // Check whether partitioned aggregation can be applied. + bool checkAggregatePartitioned( + const std::size_t estimated_num_groups, + const std::vector &is_distinct, + const std::vector> &group_by, + const std::vector &aggregate_functions) const; - /** - * @brief Get the number of partitions to be used for the aggregation. - * For non-partitioned aggregations, we return 1. - **/ - std::size_t getNumPartitions() const { - return is_aggregate_partitioned_ - ? partitioned_group_by_hashtable_pool_->getNumPartitions() - : 1; - } + // Aggregate on input block. + void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux); - int dflag; + void aggregateBlockHashTable(const ValueAccessorMultiplexer &accessor_mux); - private: // Merge locally (per storage block) aggregated states with global aggregation // states. void mergeSingleState( const std::vector> &local_state); - // Aggregate on input block. - void aggregateBlockSingleState(const block_id input_block); - void aggregateBlockHashTable(const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober); + void mergeGroupByHashTables(AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst) const; + // Finalize the aggregation results into output_destination. void finalizeSingleState(InsertDestination *output_destination); - void finalizeHashTable(InsertDestination *output_destination); - bool checkAggregatePartitioned( - const std::size_t estimated_num_groups, - const std::vector &is_distinct, - const std::vector> &group_by, - const std::vector &aggregate_functions) const { - // If there's no aggregation, return false. - if (aggregate_functions.empty()) { - return false; - } - // Check if there's a distinct operation involved in any aggregate, if so - // the aggregate can't be partitioned. - for (auto distinct : is_distinct) { - if (distinct) { - return false; - } - } - // There's no distinct aggregation involved, Check if there's at least one - // GROUP BY operation. - if (group_by.empty()) { - return false; - } - // There are GROUP BYs without DISTINCT. Check if the estimated number of - // groups is large enough to warrant a partitioned aggregation. - return estimated_num_groups > - static_cast( - FLAGS_partition_aggregation_num_groups_threshold); - } + void finalizeHashTable(const std::size_t partition_id, + InsertDestination *output_destination); + + // Specialized implementations for aggregateBlockHashTable. + void aggregateBlockHashTableImplCollisionFree( + const ValueAccessorMultiplexer &accessor_mux); + + void aggregateBlockHashTableImplPartitioned( + const ValueAccessorMultiplexer &accessor_mux); + + void aggregateBlockHashTableImplThreadPrivate( + const ValueAccessorMultiplexer &accessor_mux); + + // Specialized implementations for finalizeHashTable. + void finalizeHashTableImplCollisionFree(const std::size_t partition_id, + InsertDestination *output_destination); + + void finalizeHashTableImplPartitioned(const std::size_t partition_id, + InsertDestination *output_destination); + + void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination); // Common state for all aggregates in this operation: the input relation, the // filter predicate (if any), and the list of GROUP BY expressions (if any). const CatalogRelationSchema &input_relation_; + // Whether the aggregation is collision free or not. + bool is_aggregate_collision_free_; + // Whether the aggregation is partitioned or not. - const bool is_aggregate_partitioned_; + bool is_aggregate_partitioned_; std::unique_ptr predicate_; - std::vector> group_by_list_; // Each individual aggregate in this operation has an AggregationHandle and - // some number of Scalar arguments. - std::vector handles_; - std::vector>> arguments_; + // zero (indicated by -1) or one argument. + std::vector> handles_; // For each aggregate, whether DISTINCT should be applied to the aggregate's // arguments. std::vector is_distinct_; - // Hash table for obtaining distinct (i.e. unique) arguments. - std::vector> - distinctify_hashtables_; + // Non-trivial group-by/argument expressions that need to be evaluated. + std::vector> non_trivial_expressions_; + + std::vector group_by_key_ids_; + std::vector> argument_ids_; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all an aggregate's argument expressions are simply attributes in - // 'input_relation_', then this caches the attribute IDs of those arguments. - std::vector> arguments_as_attributes_; -#endif + std::vector group_by_types_; + + // Hash table for obtaining distinct (i.e. unique) arguments. + std::vector> distinctify_hashtables_; // Per-aggregate global states for aggregation without GROUP BY. std::vector> single_states_; @@ -295,14 +282,15 @@ class AggregationOperationState { // // TODO(shoban): We should ideally store the aggregation state together in one // hash table to prevent multiple lookups. - std::vector> - group_by_hashtables_; + std::vector> group_by_hashtables_; // A vector of group by hash table pools. std::unique_ptr group_by_hashtable_pool_; std::unique_ptr partitioned_group_by_hashtable_pool_; + std::unique_ptr collision_free_hashtable_; + StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index a44c3a7..293be17 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING) bitweaving/BitWeavingVIndexSubBlock.hpp) endif() # CMAKE_VALIDATE_IGNORE_END +add_library(quickstep_storage_CollisionFreeVectorTable + CollisionFreeVectorTable.cpp + CollisionFreeVectorTable.hpp) add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp) add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp) add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED) endif() add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp) -add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp) -add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp) -add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp) add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp) @@ -226,6 +226,7 @@ add_library(quickstep_storage_InsertDestination_proto add_library(quickstep_storage_LinearOpenAddressingHashTable ../empty_src.cpp LinearOpenAddressingHashTable.hpp) +add_library(quickstep_storage_PackedPayloadHashTable PackedPayloadHashTable.cpp PackedPayloadHashTable.hpp) add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp) add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp) add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp) @@ -253,6 +254,7 @@ add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.h add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp) add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp) add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp) +add_library(quickstep_storage_ValueAccessorMultiplexer ../empty_src.cpp ValueAccessorMultiplexer.hpp) add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp) add_library(quickstep_storage_WindowAggregationOperationState WindowAggregationOperationState.hpp @@ -272,22 +274,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunctionFactory quickstep_expressions_aggregation_AggregationHandle - quickstep_expressions_aggregation_AggregationHandleDistinct - quickstep_expressions_aggregation_AggregationID quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_storage_AggregationOperationState_proto - quickstep_storage_HashTable + quickstep_storage_CollisionFreeVectorTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory quickstep_storage_HashTablePool quickstep_storage_InsertDestination quickstep_storage_PartitionedHashTablePool + quickstep_storage_PackedPayloadHashTable quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_SubBlocksReference quickstep_storage_TupleIdSequence + quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer + quickstep_storage_ValueAccessorUtil quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor @@ -430,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING) quickstep_utility_Macros) endif() # CMAKE_VALIDATE_IGNORE_END +target_link_libraries(quickstep_storage_CollisionFreeVectorTable + quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationHandle + quickstep_expressions_aggregation_AggregationID + quickstep_storage_HashTableBase + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_types_TypeID + quickstep_types_containers_ColumnVector + quickstep_types_containers_ColumnVectorsValueAccessor + quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_Macros) target_link_libraries(quickstep_storage_ColumnStoreUtil quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogRelationSchema @@ -627,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy quickstep_threading_SpinMutex quickstep_threading_SpinSharedMutex quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastHashTable - quickstep_catalog_CatalogTypedefs - quickstep_storage_HashTableBase - quickstep_storage_StorageBlob - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_storage_StorageManager - quickstep_storage_TupleReference - quickstep_storage_ValueAccessor - quickstep_storage_ValueAccessorUtil - quickstep_threading_SpinMutex - quickstep_threading_SpinSharedMutex - quickstep_types_Type - quickstep_types_TypedValue - quickstep_utility_HashPair - quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastHashTableFactory - glog - quickstep_storage_FastHashTable - quickstep_storage_FastSeparateChainingHashTable - quickstep_storage_HashTable - quickstep_storage_HashTable_proto - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory - quickstep_storage_LinearOpenAddressingHashTable - quickstep_storage_SeparateChainingHashTable - quickstep_storage_SimpleScalarSeparateChainingHashTable - quickstep_storage_TupleReference - quickstep_types_TypeFactory - quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastSeparateChainingHashTable - quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableKeyManager - quickstep_storage_StorageBlob - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_storage_StorageManager - quickstep_threading_SpinSharedMutex - quickstep_types_Type - quickstep_types_TypedValue - quickstep_utility_Alignment - quickstep_utility_Macros - quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_FileManager quickstep_storage_StorageBlockInfo quickstep_utility_Macros @@ -731,16 +708,19 @@ target_link_libraries(quickstep_storage_HashTable quickstep_utility_HashPair quickstep_utility_Macros) target_link_libraries(quickstep_storage_HashTableBase + quickstep_storage_ValueAccessorMultiplexer quickstep_utility_Macros) target_link_libraries(quickstep_storage_HashTable_proto quickstep_types_Type_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_storage_HashTableFactory glog + quickstep_storage_CollisionFreeVectorTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase quickstep_storage_LinearOpenAddressingHashTable + quickstep_storage_PackedPayloadHashTable quickstep_storage_SeparateChainingHashTable quickstep_storage_SimpleScalarSeparateChainingHashTable quickstep_storage_TupleReference @@ -759,13 +739,10 @@ target_link_libraries(quickstep_storage_HashTableKeyManager quickstep_utility_Macros) target_link_libraries(quickstep_storage_HashTablePool glog - quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory quickstep_threading_SpinMutex - quickstep_utility_Macros - quickstep_utility_StringUtil) + quickstep_utility_Macros) target_link_libraries(quickstep_storage_IndexSubBlock quickstep_catalog_CatalogTypedefs quickstep_expressions_predicate_PredicateCost @@ -820,14 +797,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable quickstep_utility_Alignment quickstep_utility_Macros quickstep_utility_PrimeNumber) -target_link_libraries(quickstep_storage_PartitionedHashTablePool - glog +target_link_libraries(quickstep_storage_PackedPayloadHashTable + quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase + quickstep_storage_HashTableKeyManager + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer + quickstep_storage_ValueAccessorUtil + quickstep_threading_SpinMutex + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_types_containers_ColumnVectorsValueAccessor + quickstep_utility_Alignment + quickstep_utility_HashPair quickstep_utility_Macros - quickstep_utility_StringUtil) + quickstep_utility_PrimeNumber) +target_link_libraries(quickstep_storage_PartitionedHashTablePool + glog + quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory + quickstep_utility_Macros) target_link_libraries(quickstep_storage_PreloaderThread glog quickstep_catalog_CatalogDatabase @@ -936,7 +931,6 @@ target_link_libraries(quickstep_storage_StorageBlock glog quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_storage_BasicColumnStoreTupleStorageSubBlock @@ -945,7 +939,6 @@ target_link_libraries(quickstep_storage_StorageBlock quickstep_storage_CompressedColumnStoreTupleStorageSubBlock quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock quickstep_storage_CountedReference - quickstep_storage_HashTableBase quickstep_storage_IndexSubBlock quickstep_storage_InsertDestinationInterface quickstep_storage_SMAIndexSubBlock @@ -1068,6 +1061,10 @@ target_link_libraries(quickstep_storage_ValueAccessor quickstep_types_TypedValue quickstep_types_containers_Tuple quickstep_utility_Macros) +target_link_libraries(quickstep_storage_ValueAccessorMultiplexer + glog + quickstep_catalog_CatalogTypedefs + quickstep_utility_Macros) target_link_libraries(quickstep_storage_ValueAccessorUtil glog quickstep_storage_BasicColumnStoreValueAccessor @@ -1115,6 +1112,7 @@ target_link_libraries(quickstep_storage quickstep_storage_BasicColumnStoreValueAccessor quickstep_storage_BloomFilterIndexSubBlock quickstep_storage_CSBTreeIndexSubBlock + quickstep_storage_CollisionFreeVectorTable quickstep_storage_ColumnStoreUtil quickstep_storage_CompressedBlockBuilder quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -1125,9 +1123,6 @@ target_link_libraries(quickstep_storage quickstep_storage_CompressedTupleStorageSubBlock quickstep_storage_CountedReference quickstep_storage_EvictionPolicy - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory - quickstep_storage_FastSeparateChainingHashTable quickstep_storage_FileManager quickstep_storage_FileManagerLocal quickstep_storage_Flags @@ -1144,6 +1139,7 @@ target_link_libraries(quickstep_storage quickstep_storage_InsertDestination_proto quickstep_storage_LinearOpenAddressingHashTable quickstep_storage_PartitionedHashTablePool + quickstep_storage_PackedPayloadHashTable quickstep_storage_PreloaderThread quickstep_storage_SMAIndexSubBlock quickstep_storage_SeparateChainingHashTable @@ -1166,6 +1162,7 @@ target_link_libraries(quickstep_storage quickstep_storage_TupleReference quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer quickstep_storage_ValueAccessorUtil quickstep_storage_WindowAggregationOperationState quickstep_storage_WindowAggregationOperationState_proto) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp new file mode 100644 index 0000000..d836014 --- /dev/null +++ b/storage/CollisionFreeVectorTable.cpp @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "storage/CollisionFreeVectorTable.hpp" + +#include +#include +#include +#include +#include +#include + +#include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +CollisionFreeVectorTable::CollisionFreeVectorTable( + const Type *key_type, + const std::size_t num_entries, + const std::vector &handles, + StorageManager *storage_manager) + : key_type_(key_type), + num_entries_(num_entries), + num_handles_(handles.size()), + handles_(handles), + num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)), + storage_manager_(storage_manager) { + DCHECK_GT(num_entries, 0u); + + std::size_t required_memory = 0; + const std::size_t existence_map_offset = 0; + std::vector state_offsets; + + required_memory += CacheLineAlignedBytes( + BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + const AggregationHandle *handle = handles_[i]; + const std::vector argument_types = handle->getArgumentTypes(); + + std::size_t state_size = 0; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + state_size = sizeof(std::atomic); + break; + } + case AggregationID::kSum: { + DCHECK_EQ(1u, argument_types.size()); + switch (argument_types.front()->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + state_size = sizeof(std::atomic); + break; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + state_size = sizeof(std::atomic); + break; + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + + state_offsets.emplace_back(required_memory); + required_memory += CacheLineAlignedBytes(state_size * num_entries); + } + + const std::size_t num_storage_slots = + storage_manager_->SlotsNeededForBytes(required_memory); + + const block_id blob_id = storage_manager_->createBlob(num_storage_slots); + blob_ = storage_manager_->getBlobMutable(blob_id); + + void *memory_start = blob_->getMemoryMutable(); + existence_map_.reset(new BarrieredReadWriteConcurrentBitVector( + reinterpret_cast(memory_start) + existence_map_offset, + num_entries, + false /* initialize */)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + // Columnwise layout. + vec_tables_.emplace_back( + reinterpret_cast(memory_start) + state_offsets.at(i)); + } + + memory_size_ = required_memory; + num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_); +} + +CollisionFreeVectorTable::~CollisionFreeVectorTable() { + const block_id blob_id = blob_->getID(); + blob_.release(); + storage_manager_->deleteBlockOrBlobFile(blob_id); +} + +void CollisionFreeVectorTable::destroyPayload() { +} + +bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey( + const std::vector> &argument_ids, + const std::vector &key_ids, + const ValueAccessorMultiplexer &accessor_mux) { + DCHECK_EQ(1u, key_ids.size()); + + if (handles_.empty()) { + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + accessor_mux.getValueAccessorBySource(key_ids.front().source), + [&key_ids, this](auto *accessor) -> void { // NOLINT(build/c++11) + this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(), + key_type_, + key_ids.front().attr_id, + accessor); + }); + return true; + } + + DCHECK(accessor_mux.getDerivedAccessor() == nullptr || + accessor_mux.getDerivedAccessor()->getImplementationType() + == ValueAccessor::Implementation::kColumnVectors); + + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + ColumnVectorsValueAccessor *derived_accesor = + static_cast(accessor_mux.getDerivedAccessor()); + + // Dispatch to specialized implementations to achieve maximum performance. + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + base_accessor, + [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void { // NOLINT(build/c++11) + const ValueAccessorSource key_source = key_ids.front().source; + const attribute_id key_id = key_ids.front().attr_id; + const bool is_key_nullable = key_type_->isNullable(); + + for (std::size_t i = 0; i < num_handles_; ++i) { + DCHECK_LE(argument_ids[i].size(), 1u); + + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + const auto &argument_ids_i = argument_ids[i]; + + ValueAccessorSource argument_source; + attribute_id argument_id; + const Type *argument_type; + bool is_argument_nullable; + + if (argument_ids_i.empty()) { + argument_source = ValueAccessorSource::kInvalid; + argument_id = kInvalidAttributeID; + + DCHECK(argument_types.empty()); + argument_type = nullptr; + is_argument_nullable = false; + } else { + DCHECK_EQ(1u, argument_ids_i.size()); + argument_source = argument_ids_i.front().source; + argument_id = argument_ids_i.front().attr_id; + + DCHECK_EQ(1u, argument_types.size()); + argument_type = argument_types.front(); + is_argument_nullable = argument_type->isNullable(); + } + + if (key_source == ValueAccessorSource::kBase) { + if (argument_source == ValueAccessorSource::kBase) { + this->upsertValueAccessorDispatchHelper(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + accessor, + accessor); + } else { + this->upsertValueAccessorDispatchHelper(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + accessor, + derived_accesor); + } + } else { + if (argument_source == ValueAccessorSource::kBase) { + this->upsertValueAccessorDispatchHelper(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + derived_accesor, + accessor); + } else { + this->upsertValueAccessorDispatchHelper(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_id, + argument_id, + vec_tables_[i], + derived_accesor, + derived_accesor); + } + } + } + }); + return true; +} + +void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + switch (key_type_->getTypeID()) { + case TypeID::kInt: + finalizeKeyInternal(start_position, end_position, output_cv); + return; + case TypeID::kLong: + finalizeKeyInternal(start_position, end_position, output_cv); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, + const std::size_t handle_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + const AggregationHandle *handle = handles_[handle_id]; + const auto &argument_types = handle->getArgumentTypes(); + const Type *argument_type = + argument_types.empty() ? nullptr : argument_types.front(); + + finalizeStateDispatchHelper(handle->getAggregationID(), + argument_type, + vec_tables_[handle_id], + start_position, + end_position, + output_cv); +} + +} // namespace quickstep