Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 85080200BB6 for ; Fri, 4 Nov 2016 18:48:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 83903160AFE; Fri, 4 Nov 2016 17:48:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 29E43160AEA for ; Fri, 4 Nov 2016 18:48:41 +0100 (CET) Received: (qmail 62416 invoked by uid 500); 4 Nov 2016 17:48:40 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 62407 invoked by uid 99); 4 Nov 2016 17:48:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2016 17:48:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BF372C7D51 for ; Fri, 4 Nov 2016 17:48:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 0SRM3mZd6jYj for ; Fri, 4 Nov 2016 17:48:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 9397B5FCBF for ; Fri, 4 Nov 2016 17:48:30 +0000 (UTC) Received: (qmail 61998 invoked by uid 99); 4 Nov 2016 17:48:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2016 17:48:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 925B3E186F; Fri, 4 Nov 2016 17:48:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Date: Fri, 04 Nov 2016 17:48:40 -0000 Message-Id: In-Reply-To: <2dd384ffe548458980616674338f1a52@git.apache.org> References: <2dd384ffe548458980616674338f1a52@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/13] incubator-quickstep git commit: Cleanup FastHashTable + Avoid unnecessary copying of data before aggregation. archived-at: Fri, 04 Nov 2016 17:48:43 -0000 Cleanup FastHashTable + Avoid unnecessary copying of data before aggregation. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/915295de Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/915295de Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/915295de Branch: refs/heads/collision-free-agg Commit: 915295dec26cf8bfe38e43895961ba803821d88a Parents: 797b710 Author: Jianqiao Zhu Authored: Thu Oct 27 12:23:03 2016 -0500 Committer: Jianqiao Zhu Committed: Fri Nov 4 12:48:00 2016 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 44 - .../aggregation/AggregationConcreteHandle.hpp | 127 +- expressions/aggregation/AggregationHandle.hpp | 188 +- .../aggregation/AggregationHandleAvg.cpp | 82 +- .../aggregation/AggregationHandleAvg.hpp | 108 +- .../aggregation/AggregationHandleCount.cpp | 127 +- .../aggregation/AggregationHandleCount.hpp | 120 +- .../aggregation/AggregationHandleDistinct.cpp | 34 +- .../aggregation/AggregationHandleDistinct.hpp | 43 +- .../aggregation/AggregationHandleMax.cpp | 82 +- .../aggregation/AggregationHandleMax.hpp | 93 +- .../aggregation/AggregationHandleMin.cpp | 82 +- .../aggregation/AggregationHandleMin.hpp | 103 +- .../aggregation/AggregationHandleSum.cpp | 79 +- .../aggregation/AggregationHandleSum.hpp | 110 +- expressions/aggregation/CMakeLists.txt | 12 - query_execution/QueryContext.hpp | 14 - .../DestroyAggregationStateOperator.cpp | 7 - storage/AggregationOperationState.cpp | 436 ++-- storage/AggregationOperationState.hpp | 72 +- storage/CMakeLists.txt | 88 +- storage/FastHashTable.hpp | 2403 ------------------ storage/FastHashTableFactory.hpp | 224 -- storage/FastSeparateChainingHashTable.hpp | 1551 ----------- storage/HashTableBase.hpp | 43 +- storage/HashTableFactory.hpp | 37 +- storage/HashTablePool.hpp | 74 +- .../PackedPayloadAggregationStateHashTable.cpp | 434 ++++ .../PackedPayloadAggregationStateHashTable.hpp | 721 ++++++ storage/PartitionedHashTablePool.hpp | 50 +- storage/StorageBlock.cpp | 270 -- storage/StorageBlock.hpp | 150 -- 32 files changed, 1707 insertions(+), 6301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index e3fb520..3151a91 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -19,50 +19,6 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" -#include -#include - -#include "catalog/CatalogTypedefs.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" - namespace quickstep { -class StorageManager; -class Type; -class ValueAccessor; - -AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable( - const HashTableImplType hash_table_impl, - const std::vector &key_types, - const std::size_t estimated_num_distinct_keys, - StorageManager *storage_manager) const { - // Create a hash table with key types as key_types and value type as bool. - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, - key_types, - estimated_num_distinct_keys, - storage_manager); -} - -void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const { - // If the key-value pair is already there, we don't need to update the value, - // which should always be "true". I.e. the value is just a placeholder. - - AggregationStateFastHashTable *hash_table = - static_cast(distinctify_hash_table); - if (key_ids.size() == 1) { - hash_table->upsertValueAccessorFast( - key_ids, accessor, key_ids[0], true /* check_for_null_keys */); - } else { - std::vector empty_args {kInvalidAttributeID}; - hash_table->upsertValueAccessorCompositeKeyFast( - empty_args, accessor, key_ids, true /* check_for_null_keys */); - } -} - } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index 398a032..f1259c0 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -26,8 +26,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/TypedValue.hpp" @@ -51,7 +49,7 @@ class ValueAccessor; * merging two group by hash tables. **/ template -class HashTableStateUpserterFast { +class HashTableStateUpserter { public: /** * @brief Constructor. @@ -61,7 +59,7 @@ class HashTableStateUpserterFast { * table. The corresponding state (for the same key) in the destination * hash table will be upserted. **/ - HashTableStateUpserterFast(const HandleT &handle, + HashTableStateUpserter(const HandleT &handle, const std::uint8_t *source_state) : handle_(handle), source_state_(source_state) {} @@ -72,14 +70,14 @@ class HashTableStateUpserterFast { * table that is being upserted. **/ void operator()(std::uint8_t *destination_state) { - handle_.mergeStatesFast(source_state_, destination_state); + handle_.mergeStates(source_state_, destination_state); } private: const HandleT &handle_; const std::uint8_t *source_state_; - DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast); + DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter); }; /** @@ -102,50 +100,18 @@ class AggregationConcreteHandle : public AggregationHandle { << "takes at least one argument."; } - /** - * @brief Implementaion for AggregationHandle::createDistinctifyHashTable() - * that creates a new HashTable for the distinctify step for - * DISTINCT aggregation. - */ - AggregationStateHashTableBase* createDistinctifyHashTable( - const HashTableImplType hash_table_impl, - const std::vector &key_types, - const std::size_t estimated_num_distinct_keys, - StorageManager *storage_manager) const override; - - /** - * @brief Implementaion for - * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable() - * that inserts the GROUP BY expressions and aggregation arguments together - * as keys into the distinctify hash table. - */ - void insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const override; - protected: AggregationConcreteHandle() {} - template - StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( - const AggregationStateHashTableBase &distinctify_hash_table) const; - template - void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *hash_table, - std::size_t index) const; - - template - ColumnVector* finalizeHashTableHelperFast( + ColumnVector* finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const; template - inline TypedValue finalizeGroupInHashTableFast( + inline TypedValue finalizeGroupInHashTable( const AggregationStateHashTableBase &hash_table, const std::vector &group_key, int index) const { @@ -153,15 +119,10 @@ class AggregationConcreteHandle : public AggregationHandle { static_cast(hash_table).getSingleCompositeKey(group_key, index); DCHECK(group_state != nullptr) << "Could not find entry for specified group_key in HashTable"; - return static_cast(this)->finalizeHashTableEntryFast( + return static_cast(this)->finalizeHashTableEntry( group_state); } - template - void mergeGroupByHashTablesHelperFast( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const; - private: DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle); }; @@ -195,7 +156,7 @@ class HashTableAggregateFinalizer { const unsigned char *byte_ptr) { group_by_keys_->emplace_back(group_by_key); output_column_vector_->appendTypedValue( - handle_.finalizeHashTableEntryFast(byte_ptr)); + handle_.finalizeHashTableEntry(byte_ptr)); } private: @@ -209,70 +170,8 @@ class HashTableAggregateFinalizer { // ---------------------------------------------------------------------------- // Implementations of templated methods follow: -template -StateT* AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( - const AggregationStateHashTableBase &distinctify_hash_table) const { - const HandleT &handle = static_cast(*this); - StateT *state = static_cast(createInitialState()); - - // A lambda function which will be called on each key from the distinctify - // hash table. - const auto aggregate_functor = [&handle, &state]( - const TypedValue &key, const std::uint8_t &dumb_placeholder) { - // For each (unary) key in the distinctify hash table, aggregate the key - // into "state". - handle.iterateUnaryInl(state, key); - }; - - const AggregationStateFastHashTable &hash_table = - static_cast( - distinctify_hash_table); - // Invoke the lambda function "aggregate_functor" on each key from the - // distinctify hash table. - hash_table.forEach(&aggregate_functor); - - return state; -} - -template -void AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - const HandleT &handle = static_cast(*this); - HashTableT *target_hash_table = - static_cast(aggregation_hash_table); - - // A lambda function which will be called on each key-value pair from the - // distinctify hash table. - const auto aggregate_functor = [&handle, &target_hash_table, &index]( - std::vector &key, const bool &dumb_placeholder) { - // For each (composite) key vector in the distinctify hash table with size N. - // The first N-1 entries are GROUP BY columns and the last entry is the - // argument to be aggregated on. - const TypedValue argument(std::move(key.back())); - key.pop_back(); - - // An upserter as lambda function for aggregating the argument into its - // GROUP BY group's entry inside aggregation_hash_table. - const auto upserter = [&handle, &argument](std::uint8_t *state) { - handle.iterateUnaryInlFast(argument, state); - }; - - target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index); - }; - - const HashTableT &source_hash_table = - static_cast(distinctify_hash_table); - // Invoke the lambda function "aggregate_functor" on each composite key vector - // from the distinctify hash table. - source_hash_table.forEachCompositeKeyFast(&aggregate_functor); -} - template -ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( +ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, @@ -287,14 +186,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( new NativeColumnVector(result_type, hash_table_concrete.numEntries()); HashTableAggregateFinalizer finalizer( handle, group_by_keys, result); - hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + hash_table_concrete.forEach(&finalizer, index); return result; } else { IndirectColumnVector *result = new IndirectColumnVector( result_type, hash_table_concrete.numEntries()); HashTableAggregateFinalizer finalizer( handle, group_by_keys, result); - hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + hash_table_concrete.forEach(&finalizer, index); return result; } } else { @@ -303,7 +202,7 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( new NativeColumnVector(result_type, group_by_keys->size()); for (const std::vector &group_by_key : *group_by_keys) { result->appendTypedValue( - finalizeGroupInHashTableFast( + finalizeGroupInHashTable( hash_table, group_by_key, index)); } return result; @@ -312,7 +211,7 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( result_type, hash_table_concrete.numEntries()); for (const std::vector &group_by_key : *group_by_keys) { result->appendTypedValue( - finalizeGroupInHashTableFast( + finalizeGroupInHashTable( hash_table, group_by_key, index)); } return result; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index 4b51179..e004511 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -32,6 +32,7 @@ namespace quickstep { class ColumnVector; +class ColumnVectorsValueAccessor; class StorageManager; class Type; class ValueAccessor; @@ -116,29 +117,6 @@ class AggregationHandle { virtual AggregationState* createInitialState() const = 0; /** - * @brief Create a new HashTable for aggregation with GROUP BY. - * - * @param hash_table_impl The choice of which concrete HashTable - * implementation to use. - * @param group_by_types The types of the GROUP BY columns/expressions. These - * correspond to the (composite) key type for the HashTable. - * @param estimated_num_groups The estimated number of distinct groups for - * the GROUP BY aggregation. This is used to size the initial - * HashTable. This is an estimate only, and the HashTable will be - * resized if it becomes over-full. - * @param storage_manager The StorageManager to use to create the HashTable. - * A StorageBlob will be allocated to serve as the HashTable's - * in-memory storage. - * @return A new HashTable instance with the appropriate state type for this - * aggregate. - **/ - virtual AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const = 0; - - /** * @brief Accumulate over tuples for a nullary aggregate function (one that * has zero arguments, i.e. COUNT(*)). * @@ -153,63 +131,16 @@ class AggregationHandle { const std::size_t num_tuples) const = 0; /** - * @brief Accumulate (iterate over) all values in one or more ColumnVectors - * and return a new AggregationState which can be merged with other - * states or finalized. + * @brief TODO * - * @param column_vectors One or more ColumnVectors that the aggregate will be - * applied to. These correspond to the aggregate function's arguments, - * in order. * @return A new AggregationState which contains the accumulated results from * applying the aggregate to column_vectors. Caller is responsible * for deleting the returned AggregationState. **/ - virtual AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) const = 0; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - /** - * @brief Accumulate (iterate over) all values in columns accessible through - * a ValueAccessor and return a new AggregationState which can be - * merged with other states or finalized. - * - * @param accessor A ValueAccessor that the columns to be aggregated can be - * accessed through. - * @param accessor_ids The attribute_ids that correspond to the columns in - * accessor to aggeregate. These correspond to the aggregate - * function's arguments, in order. - * @return A new AggregationState which contains the accumulated results from - * applying the aggregate to the specified columns in accessor. - * Caller is responsible for deleting the returned AggregationState. - **/ - virtual AggregationState* accumulateValueAccessor( + virtual AggregationState* accumulate( ValueAccessor *accessor, - const std::vector &accessor_ids) const = 0; -#endif - - /** - * @brief Perform an aggregation with GROUP BY over all the tuples accessible - * through a ValueAccessor, upserting states in a HashTable. - * - * @note Implementations of this method are threadsafe with respect to - * hash_table, and can be called concurrently from multiple threads - * with the same HashTable object. - * - * @param accessor The ValueAccessor that will be iterated over to read - * tuples. - * @param argument_ids The attribute_ids of the arguments to this aggregate - * in accessor, in order. - * @param group_by_key_ids The attribute_ids of the group-by - * columns/expressions in accessor. - * @param hash_table The HashTable to upsert AggregationStates in. This - * should have been created by calling createGroupByHashTable() on - * this same AggregationHandle. - **/ - virtual void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const = 0; + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const = 0; /** * @brief Merge two AggregationStates, updating one in-place. This computes a @@ -269,99 +200,12 @@ class AggregationHandle { int index) const = 0; /** - * @brief Create a new HashTable for the distinctify step for DISTINCT - * aggregation. - * - * Distinctify is the first step for DISTINCT aggregation. This step inserts - * the GROUP BY expression values and aggregation arguments together as keys - * into the distinctify hash table, so that arguments are distinctified within - * each GROUP BY group. Later, a second-round aggregation on the distinctify - * hash table will be performed to actually compute the aggregated result for - * each GROUP BY group. - * - * In the case of single aggregation where there is no GROUP BY expressions, - * we simply treat it as a special GROUP BY case that the GROUP BY expression - * vector is empty. - * - * @param hash_table_impl The choice of which concrete HashTable - * implementation to use. - * @param key_types The types of the GROUP BY expressions together with the - * types of the aggregation arguments. - * @param estimated_num_distinct_keys The estimated number of distinct keys - * (i.e. GROUP BY expressions together with aggregation arguments) for - * the distinctify step. This is used to size the initial HashTable. - * This is an estimate only, and the HashTable will be resized if it - * becomes over-full. - * @param storage_manager The StorageManager to use to create the HashTable. - * A StorageBlob will be allocated to serve as the HashTable's - * in-memory storage. - * - * @return A new HashTable instance with the appropriate state type for this - * aggregate. - */ - virtual AggregationStateHashTableBase* createDistinctifyHashTable( - const HashTableImplType hash_table_impl, - const std::vector &key_types, - const std::size_t estimated_num_distinct_keys, - StorageManager *storage_manager) const = 0; - - /** - * @brief Inserts the GROUP BY expressions and aggregation arguments together - * as keys into the distinctify hash table. - * - * @param accessor The ValueAccessor that will be iterated over to read - * tuples. - * @param key_ids The attribute_ids of the GROUP BY expressions in accessor - * together with the attribute_ids of the arguments to this aggregate - * in accessor, in order. - * @param distinctify_hash_table The HashTable to store the GROUP BY - * expressions and the aggregation arguments together as hash table - * keys and a bool constant \c true as hash table value (So the hash - * table actually serves as a hash set). This should have been created - * by calling createDistinctifyHashTable(); - */ - virtual void insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const = 0; - - /** - * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from - * the distinctify hash table to actually compute the aggregated results. - * - * @param distinctify_hash_table Hash table which stores the distinctified - * aggregation arguments as hash table keys. This should have been - * created by calling createDistinctifyHashTable(); - * @return A new AggregationState which contains the aggregated results from - * applying the aggregate to the distinctify hash table. - * Caller is responsible for deleting the returned AggregationState. - */ - virtual AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const = 0; - - /** - * @brief Perform GROUP BY aggregation on the keys from the distinctify hash - * table and upserts states into the aggregation hash table. - * - * @param distinctify_hash_table Hash table which stores the GROUP BY - * expression values and aggregation arguments together as hash table - * keys. - * @param aggregation_hash_table The HashTable to upsert AggregationStates in. - * This should have been created by calling createGroupByHashTable() on - * this same AggregationHandle. - * @param index The index of the distinctify hash table for which we perform - * the DISTINCT aggregation. - */ - virtual void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const = 0; - - /** * @brief Get the number of bytes needed to store the aggregation handle's * state. **/ - virtual std::size_t getPayloadSize() const { return 1; } + virtual std::size_t getPayloadSize() const { + return 1u; + } /** * @brief Update the aggregation state for nullary aggregation function e.g. @@ -394,8 +238,8 @@ class AggregationHandle { * @param src A pointer to the source aggregation state. * @param dst A pointer to the destination aggregation state. **/ - virtual void mergeStatesFast(const std::uint8_t *src, - std::uint8_t *dst) const {} + virtual void mergeStates(const std::uint8_t *src, + std::uint8_t *dst) const {} /** * @brief Initialize the payload (in the aggregation hash table) for the given @@ -413,18 +257,6 @@ class AggregationHandle { **/ virtual void destroyPayload(std::uint8_t *byte_ptr) const {} - /** - * @brief Inform the aggregation handle to block (prohibit) updates on the - * aggregation state. - **/ - virtual void blockUpdate() {} - - /** - * @brief Inform the aggregation handle to allow updates on the - * aggregation state. - **/ - virtual void allowUpdate() {} - protected: AggregationHandle() {} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index 2481092..3d2db8c 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -24,8 +24,7 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -42,7 +41,7 @@ namespace quickstep { class StorageManager; AggregationHandleAvg::AggregationHandleAvg(const Type &type) - : argument_type_(type), block_update_(false) { + : argument_type_(type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -87,52 +86,28 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type) ->getNullableVersion()); } -AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleAvg::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size(); +AggregationState* AggregationHandleAvg::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for AVG: " << argument_ids.size(); - AggregationStateAvg *state = new AggregationStateAvg(blank_state_); - std::size_t count = 0; - state->sum_ = fast_add_operator_->accumulateColumnVector( - state->sum_, *column_vectors.front(), &count); - state->count_ = count; - return state; -} + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleAvg::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for AVG: " << accessor_ids.size(); + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); AggregationStateAvg *state = new AggregationStateAvg(blank_state_); std::size_t count = 0; state->sum_ = fast_add_operator_->accumulateValueAccessor( - state->sum_, accessor, accessor_ids.front(), &count); + state->sum_, target_accessor, target_argument_id, &count); state->count_ = count; return state; } -#endif - -void AggregationHandleAvg::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for AVG: " << argument_ids.size(); -} void AggregationHandleAvg::mergeStates(const AggregationState &source, AggregationState *destination) const { @@ -147,8 +122,8 @@ void AggregationHandleAvg::mergeStates(const AggregationState &source, avg_destination->sum_, avg_source.sum_); } -void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleAvg::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_sum_ptr = reinterpret_cast(source + blank_state_.sum_offset_); const std::int64_t *src_count_ptr = reinterpret_cast( @@ -179,27 +154,10 @@ ColumnVector* AggregationHandleAvg::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const { - return finalizeHashTableHelperFast( - *result_type_, hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleAvg, - AggregationStateAvg>(distinctify_hash_table); -} - -void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleAvg, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + *result_type_, hash_table, group_by_keys, index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index 47132c6..b40305c 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -28,7 +28,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -110,12 +109,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle { return new AggregationStateAvg(blank_state_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - /** * @brief Iterate method with average aggregation state. **/ @@ -129,28 +122,19 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++state->count_; } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); - if (value.isNull()) return; - TypedValue *sum_ptr = - reinterpret_cast(byte_ptr + blank_state_.sum_offset_); - std::int64_t *count_ptr = - reinterpret_cast(byte_ptr + blank_state_.count_offset_); - *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value); - ++(*count_ptr); - } + AggregationState* accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const override; - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void blockUpdate() override { block_update_ = true; } + TypedValue finalize(const AggregationState &state) const override; - void allowUpdate() override { block_update_ = false; } + std::size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *sum_ptr = @@ -169,43 +153,22 @@ class AggregationHandleAvg : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature())); + if (argument.isNull()) return; + TypedValue *sum_ptr = + reinterpret_cast(byte_ptr + blank_state_.sum_offset_); + std::int64_t *count_ptr = + reinterpret_cast(byte_ptr + blank_state_.count_offset_); + *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, argument); + ++(*count_ptr); + } - TypedValue finalize(const AggregationState &state) const override; + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - const AggregationStateAvg &agg_state = - static_cast(state); - // TODO(chasseur): Could improve performance further if we made a special - // version of finalizeHashTable() that collects all the sums into one - // ColumnVector and all the counts into another and then applies - // '*divide_operator_' to them in bulk. - return divide_operator_->applyToTypedValues( - agg_state.sum_, TypedValue(static_cast(agg_state.count_))); - } - - inline TypedValue finalizeHashTableEntryFast( const std::uint8_t *byte_ptr) const { std::uint8_t *value_ptr = const_cast(byte_ptr); TypedValue *sum_ptr = @@ -221,29 +184,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle { std::vector> *group_by_keys, int index) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for AVG aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for AVG aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { - return blank_state_.getPayloadSize(); - } - private: friend class AggregateFunctionAvg; @@ -261,8 +201,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle { std::unique_ptr merge_add_operator_; std::unique_ptr divide_operator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index 034c942..a5c9fd8 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -25,14 +25,9 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" -#endif - #include "types/TypeFactory.hpp" #include "types/TypeID.hpp" #include "types/TypedValue.hpp" @@ -48,73 +43,32 @@ class Type; class ValueAccessor; template -AggregationStateHashTableBase* -AggregationHandleCount::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory< - AggregationStateCount>::CreateResizable(hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); -} - -template -AggregationState* -AggregationHandleCount::accumulateColumnVectors( - const std::vector> &column_vectors) const { +AggregationState* AggregationHandleCount::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const { DCHECK(!count_star) << "Called non-nullary accumulation method on an AggregationHandleCount " << "set up for nullary COUNT(*)"; - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for COUNT: " - << column_vectors.size(); + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for COUNT: " << argument_ids.size(); - std::size_t count = 0; - InvokeOnColumnVector( - *column_vectors.front(), - [&](const auto &column_vector) -> void { // NOLINT(build/c++11) - if (nullable_type) { - // TODO(shoban): Iterating over the ColumnVector is a rather slow way - // to do this. We should look at extending the ColumnVector interface - // to do a quick count of the non-null values (i.e. the length minus - // the population count of the null bitmap). We should do something - // similar for ValueAccessor too. - for (std::size_t pos = 0; pos < column_vector.size(); ++pos) { - count += !column_vector.getTypedValue(pos).isNull(); - } - } else { - count = column_vector.size(); - } - }); + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); - return new AggregationStateCount(count); -} + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -template -AggregationState* -AggregationHandleCount::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK(!count_star) - << "Called non-nullary accumulation method on an AggregationHandleCount " - << "set up for nullary COUNT(*)"; - - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for COUNT: " << accessor_ids.size(); - - const attribute_id accessor_id = accessor_ids.front(); std::size_t count = 0; InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor, - [&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11) + target_accessor, + [&target_argument_id, &count](auto *accessor) -> void { // NOLINT(build/c++11) if (nullable_type) { while (accessor->next()) { - count += !accessor->getTypedValue(accessor_id).isNull(); + count += !accessor->getTypedValue(target_argument_id).isNull(); } } else { count = accessor->getNumTuples(); @@ -123,24 +77,6 @@ AggregationHandleCount::accumulateValueAccessor( return new AggregationStateCount(count); } -#endif - -template -void AggregationHandleCount:: - aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - if (count_star) { - DCHECK_EQ(0u, argument_ids.size()) - << "Got wrong number of arguments for COUNT(*): " - << argument_ids.size(); - } else { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for COUNT: " << argument_ids.size(); - } -} template void AggregationHandleCount::mergeStates( @@ -156,7 +92,7 @@ void AggregationHandleCount::mergeStates( } template -void AggregationHandleCount::mergeStatesFast( +void AggregationHandleCount::mergeStates( const std::uint8_t *source, std::uint8_t *destination) const { const std::int64_t *src_count_ptr = reinterpret_cast(source); @@ -170,33 +106,10 @@ AggregationHandleCount::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const { - return finalizeHashTableHelperFast< - AggregationHandleCount, - AggregationStateFastHashTable>( - TypeFactory::GetType(kLong), hash_table, group_by_keys, index); -} - -template -AggregationState* AggregationHandleCount:: - aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - DCHECK_EQ(count_star, false); - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleCount, - AggregationStateCount>(distinctify_hash_table); -} - -template -void AggregationHandleCount:: - aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - DCHECK_EQ(count_star, false); - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleCount, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + TypeFactory::GetType(kLong), hash_table, group_by_keys, index); } // Explicitly instantiate and compile in the different versions of http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index 6aab0cd..86da2a8 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -29,7 +29,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "types/TypedValue.hpp" #include "utility/Macros.hpp" @@ -102,21 +101,10 @@ class AggregationHandleCount : public AggregationConcreteHandle { return new AggregationStateCount(); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - inline void iterateNullaryInl(AggregationStateCount *state) const { state->count_.fetch_add(1, std::memory_order_relaxed); } - inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const { - std::int64_t *count_ptr = reinterpret_cast(byte_ptr); - (*count_ptr)++; - } - /** * @brief Iterate with count aggregation state. */ @@ -127,81 +115,50 @@ class AggregationHandleCount : public AggregationConcreteHandle { } } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - if ((!nullable_type) || (!value.isNull())) { - std::int64_t *count_ptr = reinterpret_cast(byte_ptr); - (*count_ptr)++; - } - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } - - inline void updateStateNullary(std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateNullaryInlFast(byte_ptr); - } - } - - void blockUpdate() override { block_update_ = true; } - - void allowUpdate() override { block_update_ = false; } - - void initPayload(std::uint8_t *byte_ptr) const override { - std::int64_t *count_ptr = reinterpret_cast(byte_ptr); - *count_ptr = 0; - } - AggregationState* accumulateNullary( const std::size_t num_tuples) const override { return new AggregationStateCount(num_tuples); } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( + AggregationState* accumulate( ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const override; void mergeStates(const AggregationState &source, AggregationState *destination) const override; - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - TypedValue finalize(const AggregationState &state) const override { return TypedValue( static_cast(state).count_.load( std::memory_order_relaxed)); } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return TypedValue( - static_cast(state).count_.load( - std::memory_order_relaxed)); + std::size_t getPayloadSize() const override { + return sizeof(std::int64_t); + } + + void initPayload(std::uint8_t *byte_ptr) const override { + std::int64_t *count_ptr = reinterpret_cast(byte_ptr); + *count_ptr = 0; + } + + inline void updateStateNullary(std::uint8_t *byte_ptr) const override { + ++(*reinterpret_cast(byte_ptr)); + } + + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if ((!nullable_type) || (!argument.isNull())) { + ++(*reinterpret_cast(byte_ptr)); + } } - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - const std::int64_t *count_ptr = - reinterpret_cast(byte_ptr); - return TypedValue(*count_ptr); + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; + + inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const { + return TypedValue(*reinterpret_cast(byte_ptr)); } ColumnVector* finalizeHashTable( @@ -209,36 +166,13 @@ class AggregationHandleCount : public AggregationConcreteHandle { std::vector> *group_by_keys, int index) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for SUM aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for SUM aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(std::int64_t); } - private: friend class AggregateFunctionCount; /** * @brief Constructor. **/ - AggregationHandleCount() : block_update_(false) {} - - bool block_update_; + AggregationHandleCount() {} DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleDistinct.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp index 0dc8b56..c6c47c7 100644 --- a/expressions/aggregation/AggregationHandleDistinct.cpp +++ b/expressions/aggregation/AggregationHandleDistinct.cpp @@ -22,10 +22,9 @@ #include #include #include -#include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "types/TypedValue.hpp" @@ -34,34 +33,6 @@ namespace quickstep { class ColumnVector; -class StorageManager; -class Type; -class ValueAccessor; - -AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return createDistinctifyHashTable( - hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); -} - -void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(argument_ids.size(), 0u); - - insertValueAccessorIntoDistinctifyHashTable( - accessor, - group_by_key_ids, - hash_table); -} ColumnVector* AggregationHandleDistinct::finalizeHashTable( const AggregationStateHashTableBase &hash_table, @@ -73,7 +44,8 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable( const bool &dumb_placeholder) -> void { group_by_keys->emplace_back(std::move(group_by_key)); }; - static_cast(hash_table).forEachCompositeKeyFast(&keys_retriever); + static_cast( + hash_table).forEach(&keys_retriever); return nullptr; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleDistinct.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp index 838bfdd..deb928a 100644 --- a/expressions/aggregation/AggregationHandleDistinct.hpp +++ b/expressions/aggregation/AggregationHandleDistinct.hpp @@ -62,21 +62,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle { << "AggregationHandleDistinct does not support accumulateNullary()."; } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - "accumulateColumnVectors()."; - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( + AggregationState* accumulate( ValueAccessor *accessor, - const std::vector &accessor_ids) const override { + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const override { LOG(FATAL) << "AggregationHandleDistinct does not support " - "accumulateValueAccessor()."; + "accumulate()."; } -#endif void mergeStates(const AggregationState &source, AggregationState *destination) const override { @@ -87,33 +79,6 @@ class AggregationHandleDistinct : public AggregationConcreteHandle { LOG(FATAL) << "AggregationHandleDistinct does not support finalize()."; } - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - << "aggregateOnDistinctifyHashTableForSingle()."; - } - - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *groupby_hash_table, - std::size_t index) const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - << "aggregateOnDistinctifyHashTableForGroupBy()."; - } - - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index c2d571b..6ffca0a 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -23,8 +23,7 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -39,51 +38,31 @@ namespace quickstep { class StorageManager; AggregationHandleMax::AggregationHandleMax(const Type &type) - : type_(type), block_update_(false) { + : type_(type) { fast_comparator_.reset( ComparisonFactory::GetComparison(ComparisonID::kGreater) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); } -AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleMax::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size(); +AggregationState* AggregationHandleMax::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for MAX: " << argument_ids.size(); - return new AggregationStateMax(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMax::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MAX: " << accessor_ids.size(); + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); return new AggregationStateMax(fast_comparator_->accumulateValueAccessor( type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMax::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MAX: " << argument_ids.size(); + target_accessor, + target_argument_id)); } void AggregationHandleMax::mergeStates(const AggregationState &source, @@ -98,12 +77,12 @@ void AggregationHandleMax::mergeStates(const AggregationState &source, } } -void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleMax::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_max_ptr = reinterpret_cast(source); TypedValue *dst_max_ptr = reinterpret_cast(destination); if (!(src_max_ptr->isNull())) { - compareAndUpdateFast(dst_max_ptr, *src_max_ptr); + compareAndUpdate(dst_max_ptr, *src_max_ptr); } } @@ -111,27 +90,10 @@ ColumnVector* AggregationHandleMax::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const { - return finalizeHashTableHelperFast( - type_.getNullableVersion(), hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMax, - AggregationStateMax>(distinctify_hash_table); -} - -void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleMax, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + type_.getNullableVersion(), hash_table, group_by_keys, index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index d851a0c..8eddd7c 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -28,7 +28,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -90,12 +89,6 @@ class AggregationHandleMax : public AggregationConcreteHandle { return new AggregationStateMax(type_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - /** * @brief Iterate with max aggregation state. */ @@ -105,23 +98,17 @@ class AggregationHandleMax : public AggregationConcreteHandle { compareAndUpdate(static_cast(state), value); } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - TypedValue *max_ptr = reinterpret_cast(byte_ptr); - compareAndUpdateFast(max_ptr, value); - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + AggregationState* accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const override; - void blockUpdate() override { block_update_ = true; } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void allowUpdate() override { block_update_ = false; } + std::size_t getPayloadSize() const override { + return sizeof(TypedValue); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *max_ptr = reinterpret_cast(byte_ptr); @@ -136,38 +123,21 @@ class AggregationHandleMax : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + DCHECK(argument.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *max_ptr = reinterpret_cast(byte_ptr); + compareAndUpdate(max_ptr, argument); + } TypedValue finalize(const AggregationState &state) const override { return TypedValue(static_cast(state).max_); } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return TypedValue(static_cast(state).max_); - } + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; - inline TypedValue finalizeHashTableEntryFast( + inline TypedValue finalizeHashTableEntry( const std::uint8_t *byte_ptr) const { const TypedValue *max_ptr = reinterpret_cast(byte_ptr); return TypedValue(*max_ptr); @@ -178,27 +148,6 @@ class AggregationHandleMax : public AggregationConcreteHandle { std::vector> *group_by_keys, int index) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MAX aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MAX aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } - private: friend class AggregateFunctionMax; @@ -227,8 +176,8 @@ class AggregationHandleMax : public AggregationConcreteHandle { } } - inline void compareAndUpdateFast(TypedValue *max_ptr, - const TypedValue &value) const { + inline void compareAndUpdate(TypedValue *max_ptr, + const TypedValue &value) const { if (value.isNull()) return; if (max_ptr->isNull() || fast_comparator_->compareTypedValues(value, *max_ptr)) { @@ -239,8 +188,6 @@ class AggregationHandleMax : public AggregationConcreteHandle { const Type &type_; std::unique_ptr fast_comparator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index a07f299..f73fedb 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -23,8 +23,7 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -39,51 +38,31 @@ namespace quickstep { class StorageManager; AggregationHandleMin::AggregationHandleMin(const Type &type) - : type_(type), block_update_(false) { + : type_(type) { fast_comparator_.reset( ComparisonFactory::GetComparison(ComparisonID::kLess) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); } -AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleMin::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size(); +AggregationState* AggregationHandleMin::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for MIN: " << argument_ids.size(); - return new AggregationStateMin(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMin::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MIN: " << accessor_ids.size(); + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); return new AggregationStateMin(fast_comparator_->accumulateValueAccessor( type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMin::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MIN: " << argument_ids.size(); + target_accessor, + target_argument_id)); } void AggregationHandleMin::mergeStates(const AggregationState &source, @@ -98,13 +77,13 @@ void AggregationHandleMin::mergeStates(const AggregationState &source, } } -void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleMin::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_min_ptr = reinterpret_cast(source); TypedValue *dst_min_ptr = reinterpret_cast(destination); if (!(src_min_ptr->isNull())) { - compareAndUpdateFast(dst_min_ptr, *src_min_ptr); + compareAndUpdate(dst_min_ptr, *src_min_ptr); } } @@ -112,27 +91,10 @@ ColumnVector* AggregationHandleMin::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const { - return finalizeHashTableHelperFast( - type_.getNonNullableVersion(), hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMin, - AggregationStateMin>(distinctify_hash_table); -} - -void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleMin, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + type_.getNonNullableVersion(), hash_table, group_by_keys, index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index e3472ec..7e129e6 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -28,7 +28,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -92,38 +91,27 @@ class AggregationHandleMin : public AggregationConcreteHandle { return new AggregationStateMin(type_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate with min aggregation state. - */ inline void iterateUnaryInl(AggregationStateMin *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); compareAndUpdate(state, value); } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - TypedValue *min_ptr = reinterpret_cast(byte_ptr); - compareAndUpdateFast(min_ptr, value); - } + AggregationState* accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const override; - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void blockUpdate() override { block_update_ = true; } + TypedValue finalize(const AggregationState &state) const override { + return static_cast(state).min_; + } - void allowUpdate() override { block_update_ = false; } + std::size_t getPayloadSize() const override { + return sizeof(TypedValue); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *min_ptr = reinterpret_cast(byte_ptr); @@ -138,41 +126,19 @@ class AggregationHandleMin : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override { - return static_cast(state).min_; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + DCHECK(argument.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *min_ptr = reinterpret_cast(byte_ptr); + compareAndUpdate(min_ptr, argument); } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast(state).min_; - } + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; - inline TypedValue finalizeHashTableEntryFast( + inline TypedValue finalizeHashTableEntry( const std::uint8_t *byte_ptr) const { - const TypedValue *min_ptr = reinterpret_cast(byte_ptr); - return TypedValue(*min_ptr); + return *reinterpret_cast(byte_ptr); } ColumnVector* finalizeHashTable( @@ -180,27 +146,6 @@ class AggregationHandleMin : public AggregationConcreteHandle { std::vector> *group_by_keys, int index) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MIN aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MIN aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } - private: friend class AggregateFunctionMin; @@ -228,8 +173,8 @@ class AggregationHandleMin : public AggregationConcreteHandle { } } - inline void compareAndUpdateFast(TypedValue *min_ptr, - const TypedValue &value) const { + inline void compareAndUpdate(TypedValue *min_ptr, + const TypedValue &value) const { if (value.isNull()) return; if (min_ptr->isNull() || fast_comparator_->compareTypedValues(value, *min_ptr)) { @@ -240,8 +185,6 @@ class AggregationHandleMin : public AggregationConcreteHandle { const Type &type_; std::unique_ptr fast_comparator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/915295de/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 642d88d..9ddeb99 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -25,8 +25,7 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -43,7 +42,7 @@ namespace quickstep { class StorageManager; AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type), block_update_(false) { + : argument_type_(type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -79,47 +78,26 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) result_type_ = &sum_type.getNullableVersion(); } -AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleSum::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector &argument_ids) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for SUM: " << argument_ids.size(); -AggregationState* AggregationHandleSum::accumulateColumnVectors( - const std::vector> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); - std::size_t num_tuples = 0; - TypedValue cv_sum = fast_operator_->accumulateColumnVector( - blank_state_.sum_, *column_vectors.front(), &num_tuples); - return new AggregationStateSum(std::move(cv_sum), num_tuples == 0); -} + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleSum::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for SUM: " << accessor_ids.size(); + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); std::size_t num_tuples = 0; TypedValue va_sum = fast_operator_->accumulateValueAccessor( - blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples); + blank_state_.sum_, target_accessor, target_argument_id, &num_tuples); return new AggregationStateSum(std::move(va_sum), num_tuples == 0); } -#endif - -void AggregationHandleSum::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector &argument_ids, - const std::vector &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for SUM: " << argument_ids.size(); -} void AggregationHandleSum::mergeStates(const AggregationState &source, AggregationState *destination) const { @@ -134,8 +112,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source, sum_destination->null_ = sum_destination->null_ && sum_source.null_; } -void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleSum::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_sum_ptr = reinterpret_cast(source + blank_state_.sum_offset_); const bool *src_null_ptr = @@ -164,27 +142,10 @@ ColumnVector* AggregationHandleSum::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys, int index) const { - return finalizeHashTableHelperFast( - *result_type_, hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleSum, - AggregationStateSum>(distinctify_hash_table); -} - -void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleSum, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + *result_type_, hash_table, group_by_keys, index); } } // namespace quickstep