Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 95D33200C15 for ; Wed, 8 Feb 2017 09:53:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9435B160B6A; Wed, 8 Feb 2017 08:53:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 50E52160B4E for ; Wed, 8 Feb 2017 09:53:54 +0100 (CET) Received: (qmail 21705 invoked by uid 500); 8 Feb 2017 08:53:53 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 21696 invoked by uid 99); 8 Feb 2017 08:53:53 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 08:53:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id ED7C7C241A for ; Wed, 8 Feb 2017 08:53:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.209 X-Spam-Level: X-Spam-Status: No, score=-6.209 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 3fL5b2zVgqHX for ; Wed, 8 Feb 2017 08:53:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F3C2660CC6 for ; Wed, 8 Feb 2017 08:53:45 +0000 (UTC) Received: (qmail 20708 invoked by uid 99); 8 Feb 2017 08:53:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 08:53:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C07BDE04F2; Wed, 8 Feb 2017 08:53:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 08 Feb 2017 08:53:54 -0000 Message-Id: <891bc8a7b1374307bb57914c5e48b761@git.apache.org> In-Reply-To: <638ec293d28b41a0ae0a9757b24c2dfc@git.apache.org> References: <638ec293d28b41a0ae0a9757b24c2dfc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/18] incubator-quickstep git commit: - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation. archived-at: Wed, 08 Feb 2017 08:53:56 -0000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index d851a0c..8f8c0d8 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -21,15 +21,14 @@ #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_MAX_HPP_ #include +#include #include #include #include -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -40,9 +39,8 @@ namespace quickstep { +class AggregationStateHashTableBase; class ColumnVector; -class StorageManager; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -86,42 +84,41 @@ class AggregationHandleMax : public AggregationConcreteHandle { public: ~AggregationHandleMax() override {} + std::vector getArgumentTypes() const override { + return {&type_}; + } + + const Type* getResultType() const override { + return &type_; + } + AggregationState* createInitialState() const override { return new AggregationStateMax(type_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate with max aggregation state. - */ inline void iterateUnaryInl(AggregationStateMax *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); compareAndUpdate(static_cast(state), value); } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { + inline void iterateUnaryInl(const TypedValue &value, + std::uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); TypedValue *max_ptr = reinterpret_cast(byte_ptr); - compareAndUpdateFast(max_ptr, value); + compareAndUpdate(max_ptr, value); } - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + AggregationState* accumulateValueAccessor( + const std::vector &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const override; - void blockUpdate() override { block_update_ = true; } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void allowUpdate() override { block_update_ = false; } + std::size_t getPayloadSize() const override { + return sizeof(TypedValue); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *max_ptr = reinterpret_cast(byte_ptr); @@ -136,38 +133,21 @@ class AggregationHandleMax : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInl(argument, byte_ptr); + } + } TypedValue finalize(const AggregationState &state) const override { return TypedValue(static_cast(state).max_); } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return TypedValue(static_cast(state).max_); - } + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; - inline TypedValue finalizeHashTableEntryFast( + inline TypedValue finalizeHashTableEntry( const std::uint8_t *byte_ptr) const { const TypedValue *max_ptr = reinterpret_cast(byte_ptr); return TypedValue(*max_ptr); @@ -175,29 +155,16 @@ class AggregationHandleMax : public AggregationConcreteHandle { ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector> *group_by_keys, - int index) const override; + const std::size_t index, + std::vector> *group_by_keys) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MAX aggregation. - */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; + const AggregationStateHashTableBase &distinctify_hash_table) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MAX aggregation. - */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const override; private: friend class AggregateFunctionMax; @@ -227,8 +194,8 @@ class AggregationHandleMax : public AggregationConcreteHandle { } } - inline void compareAndUpdateFast(TypedValue *max_ptr, - const TypedValue &value) const { + inline void compareAndUpdate(TypedValue *max_ptr, + const TypedValue &value) const { if (value.isNull()) return; if (max_ptr->isNull() || fast_comparator_->compareTypedValues(value, *max_ptr)) { @@ -239,8 +206,6 @@ class AggregationHandleMax : public AggregationConcreteHandle { const Type &type_; std::unique_ptr fast_comparator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index a07f299..08fb141 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -19,15 +19,16 @@ #include "expressions/aggregation/AggregationHandleMin.hpp" +#include +#include #include #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" -#include "types/containers/ColumnVector.hpp" #include "types/operations/comparisons/Comparison.hpp" #include "types/operations/comparisons/ComparisonFactory.hpp" #include "types/operations/comparisons/ComparisonID.hpp" @@ -36,54 +37,32 @@ namespace quickstep { -class StorageManager; +class ColumnVector; AggregationHandleMin::AggregationHandleMin(const Type &type) - : type_(type), block_update_(false) { + : AggregationConcreteHandle(AggregationID::kMin), + type_(type) { fast_comparator_.reset( ComparisonFactory::GetComparison(ComparisonID::kLess) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); } -AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleMin::accumulateValueAccessor( + const std::vector &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for MIN: " << argument_ids.size(); -AggregationState* AggregationHandleMin::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size(); + const ValueAccessorSource argument_source = argument_ids.front().source; + const attribute_id argument_id = argument_ids.front().attr_id; - return new AggregationStateMin(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMin::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MIN: " << accessor_ids.size(); + DCHECK(argument_source != ValueAccessorSource::kInvalid); + DCHECK_NE(argument_id, kInvalidAttributeID); return new AggregationStateMin(fast_comparator_->accumulateValueAccessor( type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMin::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MIN: " << argument_ids.size(); + accessor_mux.getValueAccessorBySource(argument_source), + argument_id)); } void AggregationHandleMin::mergeStates(const AggregationState &source, @@ -98,41 +77,37 @@ void AggregationHandleMin::mergeStates(const AggregationState &source, } } -void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleMin::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_min_ptr = reinterpret_cast(source); TypedValue *dst_min_ptr = reinterpret_cast(destination); if (!(src_min_ptr->isNull())) { - compareAndUpdateFast(dst_min_ptr, *src_min_ptr); + compareAndUpdate(dst_min_ptr, *src_min_ptr); } } ColumnVector* AggregationHandleMin::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast( - type_.getNonNullableVersion(), hash_table, group_by_keys, index); + const std::size_t index, + std::vector> *group_by_keys) const { + return finalizeHashTableHelper( + type_, hash_table, index, group_by_keys); } -AggregationState* -AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( +AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMin, - AggregationStateMin>(distinctify_hash_table); + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + AggregationHandleMin, AggregationStateMin>( + distinctify_hash_table); } void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleMin, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelper( + distinctify_hash_table, index, aggregation_hash_table); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index e3472ec..0e62be5 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -21,15 +21,14 @@ #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_MIN_HPP_ #include +#include #include #include #include -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -40,9 +39,8 @@ namespace quickstep { +class AggregationStateHashTableBase; class ColumnVector; -class StorageManager; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -88,42 +86,45 @@ class AggregationHandleMin : public AggregationConcreteHandle { public: ~AggregationHandleMin() override {} + std::vector getArgumentTypes() const override { + return {&type_}; + } + + const Type* getResultType() const override { + return &type_; + } + AggregationState* createInitialState() const override { return new AggregationStateMin(type_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate with min aggregation state. - */ inline void iterateUnaryInl(AggregationStateMin *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); compareAndUpdate(state, value); } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { + inline void iterateUnaryInl(const TypedValue &value, + std::uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); TypedValue *min_ptr = reinterpret_cast(byte_ptr); - compareAndUpdateFast(min_ptr, value); + compareAndUpdate(min_ptr, value); } - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + AggregationState* accumulateValueAccessor( + const std::vector &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const override; - void blockUpdate() override { block_update_ = true; } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void allowUpdate() override { block_update_ = false; } + TypedValue finalize(const AggregationState &state) const override { + return static_cast(state).min_; + } + + std::size_t getPayloadSize() const override { + return sizeof(TypedValue); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *min_ptr = reinterpret_cast(byte_ptr); @@ -138,68 +139,33 @@ class AggregationHandleMin : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override { - return static_cast(state).min_; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInl(argument, byte_ptr); + } } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast(state).min_; - } + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; - inline TypedValue finalizeHashTableEntryFast( + inline TypedValue finalizeHashTableEntry( const std::uint8_t *byte_ptr) const { - const TypedValue *min_ptr = reinterpret_cast(byte_ptr); - return TypedValue(*min_ptr); + return *reinterpret_cast(byte_ptr); } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector> *group_by_keys, - int index) const override; + const std::size_t index, + std::vector> *group_by_keys) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MIN aggregation. - */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; + const AggregationStateHashTableBase &distinctify_hash_table) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MIN aggregation. - */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const override; private: friend class AggregateFunctionMin; @@ -228,8 +194,8 @@ class AggregationHandleMin : public AggregationConcreteHandle { } } - inline void compareAndUpdateFast(TypedValue *min_ptr, - const TypedValue &value) const { + inline void compareAndUpdate(TypedValue *min_ptr, + const TypedValue &value) const { if (value.isNull()) return; if (min_ptr->isNull() || fast_comparator_->compareTypedValues(value, *min_ptr)) { @@ -240,8 +206,6 @@ class AggregationHandleMin : public AggregationConcreteHandle { const Type &type_; std::unique_ptr fast_comparator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 642d88d..9f5f220 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -20,13 +20,13 @@ #include "expressions/aggregation/AggregationHandleSum.hpp" #include +#include #include -#include #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -40,10 +40,11 @@ namespace quickstep { -class StorageManager; +class ColumnVector; AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type), block_update_(false) { + : AggregationConcreteHandle(AggregationID::kSum), + argument_type_(type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -79,47 +80,27 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) result_type_ = &sum_type.getNullableVersion(); } -AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleSum::accumulateValueAccessor( + const std::vector &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for SUM: " << argument_ids.size(); -AggregationState* AggregationHandleSum::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); - std::size_t num_tuples = 0; - TypedValue cv_sum = fast_operator_->accumulateColumnVector( - blank_state_.sum_, *column_vectors.front(), &num_tuples); - return new AggregationStateSum(std::move(cv_sum), num_tuples == 0); -} + const ValueAccessorSource argument_source = argument_ids.front().source; + const attribute_id argument_id = argument_ids.front().attr_id; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleSum::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for SUM: " << accessor_ids.size(); + DCHECK(argument_source != ValueAccessorSource::kInvalid); + DCHECK_NE(argument_id, kInvalidAttributeID); std::size_t num_tuples = 0; - TypedValue va_sum = fast_operator_->accumulateValueAccessor( - blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples); + TypedValue va_sum = + fast_operator_->accumulateValueAccessor( + blank_state_.sum_, + accessor_mux.getValueAccessorBySource(argument_source), + argument_id, + &num_tuples); return new AggregationStateSum(std::move(va_sum), num_tuples == 0); } -#endif - -void AggregationHandleSum::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for SUM: " << argument_ids.size(); -} void AggregationHandleSum::mergeStates(const AggregationState &source, AggregationState *destination) const { @@ -134,8 +115,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source, sum_destination->null_ = sum_destination->null_ && sum_source.null_; } -void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleSum::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_sum_ptr = reinterpret_cast(source + blank_state_.sum_offset_); const bool *src_null_ptr = @@ -162,29 +143,25 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { ColumnVector* AggregationHandleSum::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast( - *result_type_, hash_table, group_by_keys, index); + const std::size_t index, + std::vector> *group_by_keys) const { + return finalizeHashTableHelper( + *result_type_, hash_table, index, group_by_keys); } -AggregationState* -AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( +AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleSum, - AggregationStateSum>(distinctify_hash_table); + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + AggregationHandleSum, AggregationStateSum>( + distinctify_hash_table); } void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleSum, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelper( + distinctify_hash_table, index, aggregation_hash_table); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index f0d23e1..ba4fa9b 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -21,15 +21,14 @@ #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_SUM_HPP_ #include +#include #include #include #include -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -40,9 +39,8 @@ namespace quickstep { +class AggregationStateHashTableBase; class ColumnVector; -class StorageManager; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -101,16 +99,18 @@ class AggregationHandleSum : public AggregationConcreteHandle { public: ~AggregationHandleSum() override {} + std::vector getArgumentTypes() const override { + return {&argument_type_}; + } + + const Type* getResultType() const override { + return result_type_; + } + AggregationState* createInitialState() const override { return new AggregationStateSum(blank_state_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - inline void iterateUnaryInl(AggregationStateSum *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); @@ -121,8 +121,8 @@ class AggregationHandleSum : public AggregationConcreteHandle { state->null_ = false; } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { + inline void iterateUnaryInl(const TypedValue &value, + std::uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); if (value.isNull()) return; TypedValue *sum_ptr = @@ -133,16 +133,18 @@ class AggregationHandleSum : public AggregationConcreteHandle { *null_ptr = false; } - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + AggregationState* accumulateValueAccessor( + const std::vector &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const override; - void blockUpdate() override { block_update_ = true; } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void allowUpdate() override { block_update_ = false; } + TypedValue finalize(const AggregationState &state) const override; + + std::size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *sum_ptr = @@ -161,70 +163,32 @@ class AggregationHandleSum : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override; - - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast(state).sum_; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInl(argument, byte_ptr); + } } - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - std::uint8_t *value_ptr = const_cast(byte_ptr); - TypedValue *sum_ptr = - reinterpret_cast(value_ptr + blank_state_.sum_offset_); - return *sum_ptr; + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; + + inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const { + return *reinterpret_cast(byte_ptr + blank_state_.sum_offset_); } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector> *group_by_keys, - int index) const override; + const std::size_t index, + std::vector> *group_by_keys) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for SUM aggregation. - */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; + const AggregationStateHashTableBase &distinctify_hash_table) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for SUM aggregation. - */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { - return blank_state_.getPayloadSize(); - } + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const override; private: friend class AggregateFunctionSum; @@ -242,8 +206,6 @@ class AggregationHandleSum : public AggregationConcreteHandle { std::unique_ptr fast_operator_; std::unique_ptr merge_operator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index e9503f7..4220a8d 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -55,9 +55,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg add_library(quickstep_expressions_aggregation_AggregationHandleCount AggregationHandleCount.cpp AggregationHandleCount.hpp) -add_library(quickstep_expressions_aggregation_AggregationHandleDistinct - AggregationHandleDistinct.cpp - AggregationHandleDistinct.hpp) add_library(quickstep_expressions_aggregation_AggregationHandleMax AggregationHandleMax.cpp AggregationHandleMax.hpp) @@ -144,20 +141,21 @@ target_link_libraries(quickstep_expressions_aggregation_AggregateFunctionSum quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandle glog - quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadHashTable + quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandle glog - quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase + quickstep_storage_ValueAccessorMultiplexer quickstep_types_TypedValue quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg @@ -165,10 +163,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_expressions_aggregation_AggregationID + quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory @@ -183,39 +179,25 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_expressions_aggregation_AggregationID quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer quickstep_storage_ValueAccessorUtil + quickstep_types_LongType quickstep_types_TypeFactory quickstep_types_TypeID quickstep_types_TypedValue - quickstep_types_containers_ColumnVector - quickstep_types_containers_ColumnVectorUtil - quickstep_utility_Macros) -target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct - glog - quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_types_TypedValue quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_expressions_aggregation_AggregationID + quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypedValue - quickstep_types_containers_ColumnVector quickstep_types_operations_comparisons_Comparison quickstep_types_operations_comparisons_ComparisonFactory quickstep_types_operations_comparisons_ComparisonID @@ -225,14 +207,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_expressions_aggregation_AggregationID + quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypedValue - quickstep_types_containers_ColumnVector quickstep_types_operations_comparisons_Comparison quickstep_types_operations_comparisons_ComparisonFactory quickstep_types_operations_comparisons_ComparisonID @@ -242,10 +221,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_expressions_aggregation_AggregationID + quickstep_storage_ValueAccessorMultiplexer quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory @@ -271,7 +248,6 @@ target_link_libraries(quickstep_expressions_aggregation quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationHandleAvg quickstep_expressions_aggregation_AggregationHandleCount - quickstep_expressions_aggregation_AggregationHandleDistinct quickstep_expressions_aggregation_AggregationHandleMax quickstep_expressions_aggregation_AggregationHandleMin quickstep_expressions_aggregation_AggregationHandleSum @@ -301,7 +277,9 @@ target_link_libraries(AggregationHandle_tests quickstep_expressions_aggregation_AggregationID quickstep_storage_AggregationOperationState quickstep_storage_HashTableBase + quickstep_storage_PackedPayloadHashTable quickstep_storage_StorageManager + quickstep_storage_ValueAccessorMultiplexer quickstep_types_CharType quickstep_types_DateOperatorOverloads quickstep_types_DatetimeIntervalType http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp index 79d4448..0ad50d5 100644 --- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp @@ -29,8 +29,9 @@ #include "expressions/aggregation/AggregationHandleAvg.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/AggregationOperationState.hpp" -#include "storage/FastHashTableFactory.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/CharType.hpp" #include "types/DateOperatorOverloads.hpp" #include "types/DatetimeIntervalType.hpp" @@ -46,10 +47,7 @@ #include "types/VarCharType.hpp" #include "types/YearMonthIntervalType.hpp" #include "types/containers/ColumnVector.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "types/containers/ColumnVectorsValueAccessor.hpp" -#endif #include "gtest/gtest.h" @@ -192,39 +190,6 @@ class AggregationHandleAvgTest : public ::testing::Test { } template - void checkAggregationAvgGenericColumnVector() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_) - .isNull()); - - typename GenericType::cpptype sum; - SetDataType(0, &sum); - std::vector> column_vectors; - column_vectors.emplace_back( - createColumnVectorGeneric(type, &sum)); - - std::unique_ptr cv_state( - aggregation_handle_avg_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *cv_state); - - aggregation_handle_avg_->mergeStates(*cv_state, - aggregation_handle_avg_state_.get()); - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *aggregation_handle_avg_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template void checkAggregationAvgGenericValueAccessor() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); @@ -240,7 +205,8 @@ class AggregationHandleAvgTest : public ::testing::Test { std::unique_ptr va_state( aggregation_handle_avg_->accumulateValueAccessor( - accessor.get(), std::vector(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -256,7 +222,6 @@ class AggregationHandleAvgTest : public ::testing::Test { *aggregation_handle_avg_, *aggregation_handle_avg_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION std::unique_ptr aggregation_handle_avg_; std::unique_ptr aggregation_handle_avg_state_; @@ -311,33 +276,6 @@ TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) { checkAggregationAvgGeneric(); } -TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) { checkAggregationAvgGenericValueAccessor(); } @@ -363,7 +301,6 @@ TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) { checkAggregationAvgGenericValueAccessor(); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #ifdef QUICKSTEP_DEBUG TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) { @@ -468,28 +405,25 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { initializeHandle(long_non_null_type); storage_manager_.reset(new StorageManager("./test_avg_data")); std::unique_ptr source_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &long_non_null_type), 10, - {aggregation_handle_avg_.get()->getPayloadSize()}, {aggregation_handle_avg_.get()}, storage_manager_.get())); std::unique_ptr destination_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &long_non_null_type), 10, - {aggregation_handle_avg_.get()->getPayloadSize()}, {aggregation_handle_avg_.get()}, storage_manager_.get())); - AggregationStateFastHashTable *destination_hash_table_derived = - static_cast( - destination_hash_table.get()); + PackedPayloadHashTable *destination_hash_table_derived = + static_cast(destination_hash_table.get()); - AggregationStateFastHashTable *source_hash_table_derived = - static_cast(source_hash_table.get()); + PackedPayloadHashTable *source_hash_table_derived = + static_cast(source_hash_table.get()); AggregationHandleAvg *aggregation_handle_avg_derived = static_cast(aggregation_handle_avg_.get()); @@ -546,29 +480,29 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { memcpy(buffer + 1, common_key_source_state.get()->getPayloadAddress(), aggregation_handle_avg_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(common_key, buffer); + source_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, common_key_destination_state.get()->getPayloadAddress(), aggregation_handle_avg_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(common_key, buffer); + destination_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, exclusive_key_source_state.get()->getPayloadAddress(), aggregation_handle_avg_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer); memcpy(buffer + 1, exclusive_key_destination_state.get()->getPayloadAddress(), aggregation_handle_avg_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(exclusive_destination_key, + destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key, buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - AggregationOperationState::mergeGroupByHashTables( - source_hash_table.get(), destination_hash_table.get()); + HashTableMerger merger(destination_hash_table_derived); + source_hash_table_derived->forEachCompositeKey(&merger); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); @@ -576,21 +510,19 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { (common_key_destination_avg_val.getLiteral() + common_key_source_avg_val.getLiteral()) / static_cast(2), - aggregation_handle_avg_derived->finalizeHashTableEntryFast( + aggregation_handle_avg_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey(common_key) + 1)); CheckAvgValue( exclusive_key_destination_avg_val.getLiteral(), - aggregation_handle_avg_derived->finalizeHashTableEntryFast( + aggregation_handle_avg_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key) + - 1)); + exclusive_destination_key) + 1)); CheckAvgValue( exclusive_key_source_avg_val.getLiteral(), - aggregation_handle_avg_derived->finalizeHashTableEntryFast( + aggregation_handle_avg_derived->finalizeHashTableEntry( source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key) + - 1)); + exclusive_source_key) + 1)); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp index 78bd249..86a014b 100644 --- a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp @@ -30,8 +30,9 @@ #include "expressions/aggregation/AggregationHandleCount.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/AggregationOperationState.hpp" -#include "storage/FastHashTableFactory.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/CharType.hpp" #include "types/DoubleType.hpp" #include "types/FloatType.hpp" @@ -43,10 +44,7 @@ #include "types/TypedValue.hpp" #include "types/VarCharType.hpp" #include "types/containers/ColumnVector.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "types/containers/ColumnVectorsValueAccessor.hpp" -#endif #include "gtest/gtest.h" @@ -216,32 +214,6 @@ class AggregationHandleCountTest : public ::testing::Test { } template - void checkAggregationCountNumericColumnVector(int test_count) { - const NumericType &type = NumericType::Instance(true); - initializeHandle(&type); - CheckCountValue( - 0, *aggregation_handle_count_, *aggregation_handle_count_state_); - - std::vector> column_vectors; - column_vectors.emplace_back( - createColumnVectorNumeric(type, test_count)); - - std::unique_ptr cv_state( - aggregation_handle_count_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckCountValue(test_count, *aggregation_handle_count_, *cv_state); - - aggregation_handle_count_->mergeStates( - *cv_state, aggregation_handle_count_state_.get()); - CheckCountValue(test_count, - *aggregation_handle_count_, - *aggregation_handle_count_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template void checkAggregationCountNumericValueAccessor(int test_count) { const NumericType &type = NumericType::Instance(true); initializeHandle(&type); @@ -255,7 +227,8 @@ class AggregationHandleCountTest : public ::testing::Test { std::unique_ptr va_state( aggregation_handle_count_->accumulateValueAccessor( - accessor.get(), std::vector(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -267,7 +240,6 @@ class AggregationHandleCountTest : public ::testing::Test { *aggregation_handle_count_, *aggregation_handle_count_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION template void checkAggregationCountString(int test_count) { @@ -326,32 +298,6 @@ class AggregationHandleCountTest : public ::testing::Test { } template - void checkAggregationCountStringColumnVector(int test_count) { - const StringType &type = StringType::Instance(10, true); - initializeHandle(&type); - CheckCountValue( - 0, *aggregation_handle_count_, *aggregation_handle_count_state_); - - std::vector> column_vectors; - column_vectors.emplace_back( - createColumnVectorString(type, test_count)); - - std::unique_ptr cv_state( - aggregation_handle_count_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckCountValue(test_count, *aggregation_handle_count_, *cv_state); - - aggregation_handle_count_->mergeStates( - *cv_state, aggregation_handle_count_state_.get()); - CheckCountValue(test_count, - *aggregation_handle_count_, - *aggregation_handle_count_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template void checkAggregationCountStringValueAccessor(int test_count) { const StringType &type = StringType::Instance(10, true); initializeHandle(&type); @@ -365,7 +311,8 @@ class AggregationHandleCountTest : public ::testing::Test { std::unique_ptr va_state( aggregation_handle_count_->accumulateValueAccessor( - accessor.get(), std::vector(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -377,7 +324,6 @@ class AggregationHandleCountTest : public ::testing::Test { *aggregation_handle_count_, *aggregation_handle_count_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION std::unique_ptr aggregation_handle_count_; std::unique_ptr aggregation_handle_count_state_; @@ -425,38 +371,6 @@ TEST_F(AggregationHandleCountTest, VarCharTypeTest) { checkAggregationCountString(10000); } -TEST_F(AggregationHandleCountTest, IntTypeColumnVectorTest) { - checkAggregationCountNumericColumnVector(0); - checkAggregationCountNumericColumnVector(10000); -} - -TEST_F(AggregationHandleCountTest, LongTypeColumnVectorTest) { - checkAggregationCountNumericColumnVector(0); - checkAggregationCountNumericColumnVector(10000); -} - -TEST_F(AggregationHandleCountTest, FloatTypeColumnVectorTest) { - checkAggregationCountNumericColumnVector(0); - checkAggregationCountNumericColumnVector(10000); -} - -TEST_F(AggregationHandleCountTest, DoubleTypeColumnVectorTest) { - checkAggregationCountNumericColumnVector(0); - checkAggregationCountNumericColumnVector(10000); -} - -TEST_F(AggregationHandleCountTest, CharTypeColumnVectorTest) { - checkAggregationCountStringColumnVector(0); - checkAggregationCountStringColumnVector(10000); -} - -TEST_F(AggregationHandleCountTest, VarCharTypeColumnVectorTest) { - checkAggregationCountStringColumnVector(0); - checkAggregationCountStringColumnVector( - 10000); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleCountTest, IntTypeValueAccessorTest) { checkAggregationCountNumericValueAccessor(0); checkAggregationCountNumericValueAccessor(10000); @@ -488,7 +402,6 @@ TEST_F(AggregationHandleCountTest, VarCharTypeValueAccessorTest) { checkAggregationCountStringValueAccessor( 10000); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleCountTest, canApplyToTypeTest) { EXPECT_TRUE(ApplyToTypesTest(kInt)); @@ -511,28 +424,25 @@ TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) { initializeHandle(&long_non_null_type); storage_manager_.reset(new StorageManager("./test_count_data")); std::unique_ptr source_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &long_non_null_type), 10, - {aggregation_handle_count_.get()->getPayloadSize()}, {aggregation_handle_count_.get()}, storage_manager_.get())); std::unique_ptr destination_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &long_non_null_type), 10, - {aggregation_handle_count_.get()->getPayloadSize()}, {aggregation_handle_count_.get()}, storage_manager_.get())); - AggregationStateFastHashTable *destination_hash_table_derived = - static_cast( - destination_hash_table.get()); + PackedPayloadHashTable *destination_hash_table_derived = + static_cast(destination_hash_table.get()); - AggregationStateFastHashTable *source_hash_table_derived = - static_cast(source_hash_table.get()); + PackedPayloadHashTable *source_hash_table_derived = + static_cast(source_hash_table.get()); // TODO(harshad) - Use TemplateUtil::CreateBoolInstantiatedInstance to // generate all the combinations of the bool template arguments and test them. @@ -612,49 +522,48 @@ TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) { memcpy(buffer + 1, common_key_source_state.get()->getPayloadAddress(), aggregation_handle_count_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(common_key, buffer); + source_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, common_key_destination_state.get()->getPayloadAddress(), aggregation_handle_count_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(common_key, buffer); + destination_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, exclusive_key_source_state.get()->getPayloadAddress(), aggregation_handle_count_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer); memcpy(buffer + 1, exclusive_key_destination_state.get()->getPayloadAddress(), aggregation_handle_count_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(exclusive_destination_key, - buffer); + destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - AggregationOperationState::mergeGroupByHashTables( - source_hash_table.get(), destination_hash_table.get()); + HashTableMerger merger(destination_hash_table_derived); + source_hash_table_derived->forEachCompositeKey(&merger); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckCountValue( common_key_destination_count_val.getLiteral() + common_key_source_count_val.getLiteral(), - aggregation_handle_count_derived->finalizeHashTableEntryFast( + aggregation_handle_count_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey(common_key) + 1)); CheckCountValue( exclusive_key_destination_count_val.getLiteral(), - aggregation_handle_count_derived->finalizeHashTableEntryFast( + aggregation_handle_count_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key) + - 1)); - CheckCountValue(exclusive_key_source_count_val.getLiteral(), - aggregation_handle_count_derived->finalizeHashTableEntryFast( - source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key) + - 1)); + exclusive_destination_key) + 1)); + CheckCountValue( + exclusive_key_source_count_val.getLiteral(), + aggregation_handle_count_derived->finalizeHashTableEntry( + source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key) + 1)); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp index 026bd1d..d5e8d18 100644 --- a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp @@ -32,9 +32,10 @@ #include "expressions/aggregation/AggregationHandleMax.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/AggregationOperationState.hpp" -#include "storage/FastHashTableFactory.hpp" #include "storage/HashTableBase.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DatetimeLit.hpp" @@ -51,10 +52,7 @@ #include "types/VarCharType.hpp" #include "types/YearMonthIntervalType.hpp" #include "types/containers/ColumnVector.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "types/containers/ColumnVectorsValueAccessor.hpp" -#endif #include "types/operations/comparisons/Comparison.hpp" #include "types/operations/comparisons/ComparisonFactory.hpp" @@ -223,34 +221,6 @@ class AggregationHandleMaxTest : public ::testing::Test { } template - void checkAggregationMaxGenericColumnVector() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_max_->finalize(*aggregation_handle_max_state_) - .isNull()); - - typename GenericType::cpptype max; - std::vector> column_vectors; - column_vectors.emplace_back( - createColumnVectorGeneric(type, &max)); - - std::unique_ptr cv_state( - aggregation_handle_max_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckMaxValue( - max, *aggregation_handle_max_, *cv_state); - - aggregation_handle_max_->mergeStates(*cv_state, - aggregation_handle_max_state_.get()); - CheckMaxValue( - max, *aggregation_handle_max_, *aggregation_handle_max_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template void checkAggregationMaxGenericValueAccessor() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); @@ -266,7 +236,8 @@ class AggregationHandleMaxTest : public ::testing::Test { std::unique_ptr va_state( aggregation_handle_max_->accumulateValueAccessor( - accessor.get(), std::vector(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -278,7 +249,6 @@ class AggregationHandleMaxTest : public ::testing::Test { CheckMaxValue( max, *aggregation_handle_max_, *aggregation_handle_max_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION template void checkAggregationMaxString() { @@ -385,33 +355,6 @@ class AggregationHandleMaxTest : public ::testing::Test { } template - void checkAggregationMaxStringColumnVector() { - const StringType &type = StringType::Instance(10, true); - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_max_->finalize(*aggregation_handle_max_state_) - .isNull()); - - std::string max; - std::vector> column_vectors; - column_vectors.emplace_back( - createColumnVectorString(type, &max)); - - std::unique_ptr cv_state( - aggregation_handle_max_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckMaxString(max, *aggregation_handle_max_, *cv_state); - - aggregation_handle_max_->mergeStates(*cv_state, - aggregation_handle_max_state_.get()); - CheckMaxString( - max, *aggregation_handle_max_, *aggregation_handle_max_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template void checkAggregationMaxStringValueAccessor() { const StringType &type = StringType::Instance(10, true); initializeHandle(type); @@ -426,7 +369,8 @@ class AggregationHandleMaxTest : public ::testing::Test { std::unique_ptr va_state( aggregation_handle_max_->accumulateValueAccessor( - accessor.get(), std::vector(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -437,7 +381,6 @@ class AggregationHandleMaxTest : public ::testing::Test { CheckMaxString( max, *aggregation_handle_max_, *aggregation_handle_max_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION std::unique_ptr aggregation_handle_max_; std::unique_ptr aggregation_handle_max_state_; @@ -514,43 +457,6 @@ TEST_F(AggregationHandleMaxTest, VarCharTypeTest) { checkAggregationMaxString(); } -TEST_F(AggregationHandleMaxTest, IntTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, LongTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, FloatTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, DoubleTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, DatetimeTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationMaxGenericColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, CharTypeColumnVectorTest) { - checkAggregationMaxStringColumnVector(); -} - -TEST_F(AggregationHandleMaxTest, VarCharColumnVectorTypeTest) { - checkAggregationMaxStringColumnVector(); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleMaxTest, IntTypeValueAccessorTest) { checkAggregationMaxGenericValueAccessor(); } @@ -586,7 +492,6 @@ TEST_F(AggregationHandleMaxTest, CharTypeValueAccessorTest) { TEST_F(AggregationHandleMaxTest, VarCharValueAccessorTypeTest) { checkAggregationMaxStringValueAccessor(); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #ifdef QUICKSTEP_DEBUG TEST_F(AggregationHandleMaxDeathTest, WrongTypeTest) { @@ -689,28 +594,25 @@ TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) { initializeHandle(int_non_null_type); storage_manager_.reset(new StorageManager("./test_max_data")); std::unique_ptr source_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &int_non_null_type), 10, - {aggregation_handle_max_.get()->getPayloadSize()}, {aggregation_handle_max_.get()}, storage_manager_.get())); std::unique_ptr destination_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector(1, &int_non_null_type), 10, - {aggregation_handle_max_.get()->getPayloadSize()}, {aggregation_handle_max_.get()}, storage_manager_.get())); - AggregationStateFastHashTable *destination_hash_table_derived = - static_cast( - destination_hash_table.get()); + PackedPayloadHashTable *destination_hash_table_derived = + static_cast(destination_hash_table.get()); - AggregationStateFastHashTable *source_hash_table_derived = - static_cast(source_hash_table.get()); + PackedPayloadHashTable *source_hash_table_derived = + static_cast(source_hash_table.get()); AggregationHandleMax *aggregation_handle_max_derived = static_cast(aggregation_handle_max_.get()); @@ -780,47 +682,47 @@ TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) { memcpy(buffer + 1, common_key_source_state.get()->getPayloadAddress(), aggregation_handle_max_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(common_key, buffer); + source_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, common_key_destination_state.get()->getPayloadAddress(), aggregation_handle_max_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(common_key, buffer); + destination_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, exclusive_key_source_state.get()->getPayloadAddress(), aggregation_handle_max_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer); memcpy(buffer + 1, exclusive_key_destination_state.get()->getPayloadAddress(), aggregation_handle_max_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(exclusive_destination_key, - buffer); + destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - AggregationOperationState::mergeGroupByHashTables( - source_hash_table.get(), destination_hash_table.get()); + HashTableMerger merger(destination_hash_table_derived); + source_hash_table_derived->forEachCompositeKey(&merger); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckMaxValue( common_key_destination_max_val.getLiteral(), - aggregation_handle_max_derived->finalizeHashTableEntryFast( + aggregation_handle_max_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey(common_key) + 1)); - CheckMaxValue(exclusive_key_destination_max_val.getLiteral(), - aggregation_handle_max_derived->finalizeHashTableEntryFast( - destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key) + - 1)); - CheckMaxValue(exclusive_key_source_max_val.getLiteral(), - aggregation_handle_max_derived->finalizeHashTableEntryFast( - source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key) + - 1)); + CheckMaxValue( + exclusive_key_destination_max_val.getLiteral(), + aggregation_handle_max_derived->finalizeHashTableEntry( + destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key) + 1)); + CheckMaxValue( + exclusive_key_source_max_val.getLiteral(), + aggregation_handle_max_derived->finalizeHashTableEntry( + source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key) + 1)); } } // namespace quickstep