Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7A762200B87 for ; Mon, 19 Sep 2016 17:59:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 78FBE160ACC; Mon, 19 Sep 2016 15:59:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DBD60160ABB for ; Mon, 19 Sep 2016 17:59:52 +0200 (CEST) Received: (qmail 3826 invoked by uid 500); 19 Sep 2016 15:59:52 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 3817 invoked by uid 99); 19 Sep 2016 15:59:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 15:59:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 89442C138C for ; Mon, 19 Sep 2016 15:59:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id WXoxsv96MEkM for ; Mon, 19 Sep 2016 15:59:47 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C6E3560E2D for ; Mon, 19 Sep 2016 15:59:45 +0000 (UTC) Received: (qmail 3744 invoked by uid 99); 19 Sep 2016 15:59:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 15:59:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6508E0158; Mon, 19 Sep 2016 15:59:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Single aggregationGroupBy method in StorageBlock. [Forced Update!] Date: Mon, 19 Sep 2016 15:59:44 +0000 (UTC) archived-at: Mon, 19 Sep 2016 15:59:54 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/quickstep-28-29 06f399057 -> 7924d35c6 (forced update) Single aggregationGroupBy method in StorageBlock. - New methods for separating unary and nullary updation of states. - Added TODO to move method from HashTableBase class. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7924d35c Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7924d35c Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7924d35c Branch: refs/heads/quickstep-28-29 Commit: 7924d35c67114ace982951d3311e6c0b3dbe4ec0 Parents: 2a9efc4 Author: Harshad Deshmukh Authored: Mon Sep 12 16:03:01 2016 -0500 Committer: Harshad Deshmukh Committed: Mon Sep 19 10:59:16 2016 -0500 ---------------------------------------------------------------------- catalog/CatalogTypedefs.hpp | 2 + .../aggregation/AggregationConcreteHandle.cpp | 7 +- expressions/aggregation/AggregationHandle.hpp | 32 ++++++- .../aggregation/AggregationHandleAvg.hpp | 6 +- .../aggregation/AggregationHandleCount.hpp | 15 ++-- .../aggregation/AggregationHandleMax.hpp | 6 +- .../aggregation/AggregationHandleMin.hpp | 6 +- .../aggregation/AggregationHandleSum.hpp | 6 +- query_optimizer/ExecutionGenerator.cpp | 20 ++--- .../tests/AggregationOperator_unittest.cpp | 3 +- storage/AggregationOperationState.cpp | 64 +++++++------- storage/AggregationOperationState.hpp | 4 +- storage/FastHashTable.hpp | 60 +++++++------ storage/FastHashTableFactory.hpp | 46 ---------- storage/HashTableBase.hpp | 19 +++- storage/StorageBlock.cpp | 91 ++------------------ storage/StorageBlock.hpp | 25 ++---- 17 files changed, 156 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/catalog/CatalogTypedefs.hpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogTypedefs.hpp b/catalog/CatalogTypedefs.hpp index f7a2d53..70bac84 100644 --- a/catalog/CatalogTypedefs.hpp +++ b/catalog/CatalogTypedefs.hpp @@ -49,6 +49,8 @@ constexpr int kInvalidCatalogId = -1; // Used to indicate no preference for a NUMA Node ID. constexpr numa_node_id kAnyNUMANodeID = -1; +constexpr attribute_id kInvalidAttributeID = -1; + /** @} */ } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index ae677d9..e3fb520 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -56,13 +56,10 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( AggregationStateFastHashTable *hash_table = static_cast(distinctify_hash_table); if (key_ids.size() == 1) { - std::vector> args; - args.emplace_back(key_ids); hash_table->upsertValueAccessorFast( - args, accessor, key_ids[0], true /* check_for_null_keys */); + key_ids, accessor, key_ids[0], true /* check_for_null_keys */); } else { - std::vector> empty_args; - empty_args.resize(1); + std::vector empty_args {kInvalidAttributeID}; hash_table->upsertValueAccessorCompositeKeyFast( empty_args, accessor, key_ids, true /* check_for_null_keys */); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index d2cee6d..4435760 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -130,7 +130,7 @@ class AggregationHandle { * A StorageBlob will be allocated to serve as the HashTable's * in-memory storage. * @return A new HashTable instance with the appropriate state type for this - * aggregate as the ValueT. + * aggregate. **/ virtual AggregationStateHashTableBase* createGroupByHashTable( const HashTableImplType hash_table_impl, @@ -297,7 +297,7 @@ class AggregationHandle { * in-memory * storage. * @return A new HashTable instance with the appropriate state type for this - * aggregate as the ValueT. + * aggregate. */ virtual AggregationStateHashTableBase* createDistinctifyHashTable( const HashTableImplType hash_table_impl, @@ -357,12 +357,36 @@ class AggregationHandle { std::size_t index) const = 0; virtual std::size_t getPayloadSize() const { return 1; } - virtual void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const {} + + /** + * @brief Update the aggregation state for nullary aggregation function e.g. + * COUNT(*). + * + * @note This function should be overloaded by those aggregation function + * which can perform nullary operations, e.g. COUNT. + * + * @param byte_ptr The pointer where the aggregation state is stored. + **/ + virtual void updateStateNullary(std::uint8_t *byte_ptr) const {} + + /** + * @brief Update the aggregation state for unary aggregation function e.g. + * SUM(a). + * + * @param argument The argument which will be used to update the state of the + * aggregation function. + * @param byte_ptr The pointer where the aggregation state is stored. + **/ + virtual void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const {} + virtual void mergeStatesFast(const std::uint8_t *src, std::uint8_t *dst) const {} + virtual void initPayload(std::uint8_t *byte_ptr) const {} + virtual void blockUpdate() {} + virtual void allowUpdate() {} protected: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index 3e49213..366ba8e 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -141,10 +141,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++(*count_ptr); } - inline void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const override { + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { if (!block_update_) { - iterateUnaryInlFast(arguments.front(), byte_ptr); + iterateUnaryInlFast(argument, byte_ptr); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index 2c6d717..9b97590 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -135,13 +135,16 @@ class AggregationHandleCount : public AggregationConcreteHandle { } } - inline void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const override { + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { if (!block_update_) { - if (arguments.size()) - iterateUnaryInlFast(arguments.front(), byte_ptr); - else - iterateNullaryInlFast(byte_ptr); + iterateUnaryInlFast(argument, byte_ptr); + } + } + + inline void updateStateNullary(std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateNullaryInlFast(byte_ptr); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index de173c9..6c54b9d 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -112,10 +112,10 @@ class AggregationHandleMax : public AggregationConcreteHandle { compareAndUpdateFast(max_ptr, value); } - inline void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const override { + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { if (!block_update_) { - iterateUnaryInlFast(arguments.front(), byte_ptr); + iterateUnaryInlFast(argument, byte_ptr); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 4a0eca4..9baf736 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -114,10 +114,10 @@ class AggregationHandleMin : public AggregationConcreteHandle { compareAndUpdateFast(min_ptr, value); } - inline void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const override { + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { if (!block_update_) { - iterateUnaryInlFast(arguments.front(), byte_ptr); + iterateUnaryInlFast(argument, byte_ptr); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index 8d719ab..18d45d9 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -133,10 +133,10 @@ class AggregationHandleSum : public AggregationConcreteHandle { *null_ptr = false; } - inline void updateState(const std::vector &arguments, - std::uint8_t *byte_ptr) const override { + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { if (!block_update_) { - iterateUnaryInlFast(arguments.front(), byte_ptr); + iterateUnaryInlFast(argument, byte_ptr); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 130134c..968314e 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1371,13 +1371,9 @@ void ExecutionGenerator::convertAggregate( } if (!group_by_types.empty()) { - // SimplifyHashTableImplTypeProto() switches the hash table implementation - // from SeparateChaining to SimpleScalarSeparateChaining when there is a - // single scalar key type with a reversible hash function. + // Right now, only SeparateChaining is supported. aggr_state_proto->set_hash_table_impl_type( - SimplifyHashTableImplTypeProto( - HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type), - group_by_types)); + serialization::HashTableImplType::SEPARATE_CHAINING); } for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { @@ -1404,15 +1400,9 @@ void ExecutionGenerator::convertAggregate( if (unnamed_aggregate_expression->is_distinct()) { const std::vector &arguments = unnamed_aggregate_expression->getArguments(); DCHECK_GE(arguments.size(), 1u); - if (group_by_types.empty() && arguments.size() == 1) { - aggr_state_proto->add_distinctify_hash_table_impl_types( - SimplifyHashTableImplTypeProto( - HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type), - {&arguments[0]->getValueType()})); - } else { - aggr_state_proto->add_distinctify_hash_table_impl_types( - HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type)); - } + // Right now only SeparateChaining implementation is supported. + aggr_state_proto->add_distinctify_hash_table_impl_types( + serialization::HashTableImplType::SEPARATE_CHAINING); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/relational_operators/tests/AggregationOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp index 0138362..6881dea 100644 --- a/relational_operators/tests/AggregationOperator_unittest.cpp +++ b/relational_operators/tests/AggregationOperator_unittest.cpp @@ -363,8 +363,9 @@ class AggregationOperatorTest : public ::testing::Test { aggr_state_proto->set_estimated_num_entries(estimated_entries); // Also need to set the HashTable implementation for GROUP BY. + // Right now, only SeparateChaining is supported. aggr_state_proto->set_hash_table_impl_type( - serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING); + serialization::HashTableImplType::SEPARATE_CHAINING); // Create Operators. op_.reset(new AggregationOperator(0, *table_, true, aggr_state_index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index c5f59f9..073b813 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -94,13 +94,12 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back(new AggregationHandleDistinct()); arguments_.push_back({}); is_distinct_.emplace_back(false); - group_by_hashtable_pools_.emplace_back( - std::unique_ptr(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - {1}, - handles_, - storage_manager))); + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + {1}, + handles_, + storage_manager)); } else { // Set up each individual aggregate in this operation. std::vector::const_iterator agg_func_it = @@ -196,13 +195,12 @@ AggregationOperationState::AggregationOperationState( if (!group_by_handles.empty()) { // Aggregation with GROUP BY: create a HashTable pool for per-group // states. - group_by_hashtable_pools_.emplace_back(std::unique_ptr( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager))); + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager)); } } } @@ -444,17 +442,17 @@ void AggregationOperationState::aggregateBlockHashTable( // Call StorageBlock::aggregateGroupBy() to aggregate this block's values // directly into the (threadsafe) shared global HashTable for this // aggregate. - DCHECK(group_by_hashtable_pools_[0] != nullptr); + DCHECK(group_by_hashtable_pool_ != nullptr); AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pools_[0]->getHashTableFast(); + group_by_hashtable_pool_->getHashTableFast(); DCHECK(agg_hash_table != nullptr); - block->aggregateGroupByFast(arguments_, - group_by_list_, - predicate_.get(), - agg_hash_table, - &reuse_matches, - &reuse_group_by_vectors); - group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); + block->aggregateGroupBy(arguments_, + group_by_list_, + predicate_.get(), + agg_hash_table, + &reuse_matches, + &reuse_group_by_vectors); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); } void AggregationOperationState::finalizeSingleState( @@ -497,7 +495,7 @@ void AggregationOperationState::finalizeHashTable( // TODO(harshad) - Find heuristics for faster merge, even in a single thread. // e.g. Keep merging entries from smaller hash tables to larger. - auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); if (hash_tables->size() > 1) { for (int hash_table_index = 0; hash_table_index < static_cast(hash_tables->size() - 1); @@ -512,17 +510,17 @@ void AggregationOperationState::finalizeHashTable( std::vector> final_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pools_[0] != nullptr); - auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + DCHECK(group_by_hashtable_pool_ != nullptr); + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pools_[0]->getHashTableFast(); - group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + group_by_hashtable_pool_->getHashTableFast(); + group_by_hashtable_pool_->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pool_->getAllHashTables(); } DCHECK(hash_tables->back() != nullptr); AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); @@ -532,16 +530,16 @@ void AggregationOperationState::finalizeHashTable( *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); } - auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pools_[0]->getHashTable(); - group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + group_by_hashtable_pool_->getHashTable(); + group_by_hashtable_pool_->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pool_->getAllHashTables(); } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 7956bc6..cbbfc22 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -221,8 +221,8 @@ class AggregationOperationState { std::vector> group_by_hashtables_; - // A vector of group by hash table pools, one for each group by clause. - std::vector> group_by_hashtable_pools_; + // A vector of group by hash table pools. + std::unique_ptr group_by_hashtable_pool_; StorageManager *storage_manager_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/FastHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp index f1e8d1a..4a95cd9 100644 --- a/storage/FastHashTable.hpp +++ b/storage/FastHashTable.hpp @@ -456,7 +456,7 @@ class FastHashTable : public HashTableBase> &argument_ids, + const std::vector &argument_ids, ValueAccessor *accessor, const attribute_id key_attr_id, const bool check_for_null_keys); @@ -509,7 +509,7 @@ class FastHashTable : public HashTableBase> &argument, + const std::vector &argument, ValueAccessor *accessor, const std::vector &key_attr_ids, const bool check_for_null_keys) override; @@ -1866,13 +1866,12 @@ bool FastHashTable:: upsertValueAccessorFast( - const std::vector> &argument_ids, + const std::vector &argument_ids, ValueAccessor *accessor, const attribute_id key_attr_id, const bool check_for_null_keys) { DEBUG_ASSERT(!allow_duplicate_keys); std::size_t variable_size; - std::vector local; return InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> bool { // NOLINT(build/c++11) @@ -1898,13 +1897,14 @@ bool FastHashTable(value))); for (unsigned int k = 0; k < num_handles_; ++k) { - local.clear(); - if (argument_ids[k].size()) { - local.emplace_back( - accessor->getTypedValue(argument_ids[k].front())); + if (argument_ids[k] != kInvalidAttributeID) { + handles_[k]->updateStateUnary( + accessor->getTypedValue(argument_ids[k]), + value + payload_offsets_[k]); + } else { + handles_[k]->updateStateNullary(value + + payload_offsets_[k]); } - handles_[k]->updateState(local, - value + payload_offsets_[k]); } } } @@ -1929,12 +1929,14 @@ bool FastHashTable(value))); for (unsigned int k = 0; k < num_handles_; ++k) { - local.clear(); - if (argument_ids[k].size()) { - local.emplace_back( - accessor->getTypedValue(argument_ids[k].front())); + if (argument_ids[k] != kInvalidAttributeID) { + handles_[k]->updateStateUnary( + accessor->getTypedValue(argument_ids[k]), + value + payload_offsets_[k]); + } else { + handles_[k]->updateStateNullary(value + + payload_offsets_[k]); } - handles_[k]->updateState(local, value + payload_offsets_[k]); } } } @@ -1953,7 +1955,7 @@ bool FastHashTable:: upsertValueAccessorCompositeKeyFast( - const std::vector> &argument_ids, + const std::vector &argument_ids, ValueAccessor *accessor, const std::vector &key_attr_ids, const bool check_for_null_keys) { @@ -1961,7 +1963,6 @@ bool FastHashTable key_vector; key_vector.resize(key_attr_ids.size()); - std::vector local; return InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> bool { // NOLINT(build/c++11) @@ -1989,13 +1990,14 @@ bool FastHashTable(value))); for (unsigned int k = 0; k < num_handles_; ++k) { - local.clear(); - if (argument_ids[k].size()) { - local.emplace_back( - accessor->getTypedValue(argument_ids[k].front())); + if (argument_ids[k] != kInvalidAttributeID) { + handles_[k]->updateStateUnary( + accessor->getTypedValue(argument_ids[k]), + value + payload_offsets_[k]); + } else { + handles_[k]->updateStateNullary(value + + payload_offsets_[k]); } - handles_[k]->updateState(local, - value + payload_offsets_[k]); } } } @@ -2022,12 +2024,14 @@ bool FastHashTable(value))); for (unsigned int k = 0; k < num_handles_; ++k) { - local.clear(); - if (argument_ids[k].size()) { - local.emplace_back( - accessor->getTypedValue(argument_ids[k].front())); + if (argument_ids[k] != kInvalidAttributeID) { + handles_[k]->updateStateUnary( + accessor->getTypedValue(argument_ids[k]), + value + payload_offsets_[k]); + } else { + handles_[k]->updateStateNullary(value + + payload_offsets_[k]); } - handles_[k]->updateState(local, value + payload_offsets_[k]); } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/FastHashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/FastHashTableFactory.hpp b/storage/FastHashTableFactory.hpp index 6ad3212..dc4f893 100644 --- a/storage/FastHashTableFactory.hpp +++ b/storage/FastHashTableFactory.hpp @@ -90,30 +90,6 @@ class FastHashTableFactory { serializable, force_key_copy, allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager); - case HashTableImplType::kLinearOpenAddressing: -/* return new LinearOpenAddressingHashTable< - ValueT, - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, num_entries, storage_manager);*/ - return new FastSeparateChainingHashTable< - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager); - case HashTableImplType::kSimpleScalarSeparateChaining: - return new FastSeparateChainingHashTable< - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, num_entries, payload_sizes, handles, storage_manager); -/* return new SimpleScalarSeparateChainingHashTable< - ValueT, - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, num_entries, storage_manager);*/ default: { LOG(FATAL) << "Unrecognized HashTableImplType in HashTableFactory::createResizable()\n"; } @@ -167,28 +143,6 @@ class FastHashTableFactory { hash_table_memory_size, new_hash_table, hash_table_memory_zeroed); - case HashTableImplType::kLinearOpenAddressing: -/* return new LinearOpenAddressingHashTable< - ValueT, - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, - hash_table_memory, - hash_table_memory_size, - new_hash_table, - hash_table_memory_zeroed);*/ - case HashTableImplType::kSimpleScalarSeparateChaining: -/* return new SimpleScalarSeparateChainingHashTable< - ValueT, - resizable, - serializable, - force_key_copy, - allow_duplicate_keys>(key_types, - hash_table_memory, - hash_table_memory_size, - new_hash_table, - hash_table_memory_zeroed);*/ default: { LOG(FATAL) << "Unrecognized HashTableImplType\n"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/HashTableBase.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp index b908d6f..cd0a141 100644 --- a/storage/HashTableBase.hpp +++ b/storage/HashTableBase.hpp @@ -74,8 +74,25 @@ class HashTableBase { public: virtual ~HashTableBase() {} + /** + * TODO(harshad) We should get rid of this function from here. We are + * postponing it because of the amount of work to be done is significant. + * The steps are as follows: + * 1. Replace AggregationStateHashTableBase occurence in HashTablePool to + * the FastHashTable implementation (i.e. an implementation specialized for + * aggregation). + * 2. Remove createGroupByHashTable from the AggregationHandle* classes. + * 3. Replace AggregationStateHashTableBase occurences in AggregationHandle* + * clases to the FastHashTable implementation (i.e. an implementation + * specialized for aggregation). + * 4. Move this method to the FastHashTable class from here, so that it can + * be called from the AggregationHandle* classes. + * + * Optionally, we can also remove the AggregationStateHashTableBase + * specialization from this file. + **/ virtual bool upsertValueAccessorCompositeKeyFast( - const std::vector> &argument, + const std::vector &argument, ValueAccessor *accessor, const std::vector &key_attr_ids, const bool check_for_null_keys) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index 8ff18b5..ec5990f 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -415,87 +415,6 @@ AggregationState* StorageBlock::aggregate( } void StorageBlock::aggregateGroupBy( - const AggregationHandle &handle, - const std::vector> &arguments, - const std::vector> &group_by, - const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr *reuse_matches, - std::vector> *reuse_group_by_vectors) const { - DCHECK_GT(group_by.size(), 0u) - << "Called aggregateGroupBy() with zero GROUP BY expressions"; - - SubBlocksReference sub_blocks_ref(*tuple_store_, - indices_, - indices_consistent_); - - // IDs of 'arguments' as attributes in the ValueAccessor we create below. - std::vector argument_ids; - - // IDs of GROUP BY key element(s) in the ValueAccessor we create below. - std::vector key_ids; - - // An intermediate ValueAccessor that stores the materialized 'arguments' for - // this aggregate, as well as the GROUP BY expression values. - ColumnVectorsValueAccessor temp_result; - { - std::unique_ptr accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - - // Create a filtered ValueAccessor that only iterates over predicate - // matches. - accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); - } else { - // Create a ValueAccessor that iterates over all tuples in this block - accessor.reset(tuple_store_->createValueAccessor()); - } - - attribute_id attr_id = 0; - - // First, put GROUP BY keys into 'temp_result'. - if (reuse_group_by_vectors->empty()) { - // Compute GROUP BY values from group_by Scalars, and store them in - // reuse_group_by_vectors for reuse by other aggregates on this same - // block. - reuse_group_by_vectors->reserve(group_by.size()); - for (const std::unique_ptr &group_by_element : group_by) { - reuse_group_by_vectors->emplace_back( - group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); - temp_result.addColumn(reuse_group_by_vectors->back().get(), false); - key_ids.push_back(attr_id++); - } - } else { - // Reuse precomputed GROUP BY values from reuse_group_by_vectors. - DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) - << "Wrong number of reuse_group_by_vectors"; - for (const std::unique_ptr &reuse_cv : *reuse_group_by_vectors) { - temp_result.addColumn(reuse_cv.get(), false); - key_ids.push_back(attr_id++); - } - } - - // Compute argument vectors and add them to 'temp_result'. - for (const std::unique_ptr &argument : arguments) { - temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); - argument_ids.push_back(attr_id++); - } - } - - // Actually do aggregation into '*hash_table'. - handle.aggregateValueAccessorIntoHashTable(&temp_result, - argument_ids, - key_ids, - hash_table); -} - - -void StorageBlock::aggregateGroupByFast( const std::vector>> &arguments, const std::vector> &group_by, const Predicate *predicate, @@ -510,8 +429,7 @@ void StorageBlock::aggregateGroupByFast( indices_consistent_); // IDs of 'arguments' as attributes in the ValueAccessor we create below. - std::vector arg_ids; - std::vector> argument_ids; + std::vector argument_ids; // IDs of GROUP BY key element(s) in the ValueAccessor we create below. std::vector key_ids; @@ -563,12 +481,13 @@ void StorageBlock::aggregateGroupByFast( // Compute argument vectors and add them to 'temp_result'. for (const std::vector> &argument : arguments) { - arg_ids.clear(); for (const std::unique_ptr &args : argument) { temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); - arg_ids.push_back(attr_id++); + argument_ids.push_back(attr_id++); + } + if (argument.empty()) { + argument_ids.push_back(kInvalidAttributeID); } - argument_ids.push_back(arg_ids); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7924d35c/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 8b59a3c..398008e 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -459,23 +459,14 @@ class StorageBlock : public StorageBlockBase { * attributes as std::vector (like in selectSimple()) for fast * path when there are no expressions specified in the query. */ - void aggregateGroupBy(const AggregationHandle &handle, - const std::vector> &arguments, - const std::vector> &group_by, - const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr *reuse_matches, - std::vector> - *reuse_group_by_vectors) const; - - - void aggregateGroupByFast(const std::vector>> &arguments, - const std::vector> &group_by, - const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr *reuse_matches, - std::vector> - *reuse_group_by_vectors) const; + void aggregateGroupBy( + const std::vector>> &arguments, + const std::vector> &group_by, + const Predicate *predicate, + AggregationStateHashTableBase *hash_table, + std::unique_ptr *reuse_matches, + std::vector> *reuse_group_by_vectors) const; + /** * @brief Inserts the GROUP BY expressions and aggregation arguments together * as keys into the distinctify hash table.