Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CC185200AF7 for ; Tue, 31 May 2016 00:47:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CAAB1160A19; Mon, 30 May 2016 22:47:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A332D160A3E for ; Tue, 31 May 2016 00:47:24 +0200 (CEST) Received: (qmail 62136 invoked by uid 500); 30 May 2016 22:47:23 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 62127 invoked by uid 99); 30 May 2016 22:47:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 May 2016 22:47:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 4FBF0C0E0E for ; Mon, 30 May 2016 22:47:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id GxQu2mh2mZh1 for ; Mon, 30 May 2016 22:47:15 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 3DDA75FD6B for ; Mon, 30 May 2016 22:47:12 +0000 (UTC) Received: (qmail 60689 invoked by uid 99); 30 May 2016 22:47:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 May 2016 22:47:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20F09E0498; Mon, 30 May 2016 22:47:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Mon, 30 May 2016 22:47:37 -0000 Message-Id: In-Reply-To: <5666ec376cd64f1b8c203563a569a75a@git.apache.org> References: <5666ec376cd64f1b8c203563a569a75a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/32] incubator-quickstep git commit: Groupby hashtable pool (#236) archived-at: Mon, 30 May 2016 22:47:27 -0000 Groupby hashtable pool (#236) - Created a HashTablePool class for group by clause. - Each thread can checkout it's own hash table while doing group by aggregation. - AggregationOperationState uses one hash table pool per group by clause. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2ddb67bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2ddb67bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2ddb67bf Branch: refs/heads/master Commit: 2ddb67bf438878b572e997caec2397e4c7ac9b8f Parents: 5bda90e Author: Harshad Deshmukh Authored: Tue May 24 19:04:39 2016 -0500 Committer: Zuyu Zhang Committed: Mon May 30 15:47:53 2016 -0700 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.hpp | 105 ++++++++++++ expressions/aggregation/AggregationHandle.hpp | 15 +- .../aggregation/AggregationHandleAvg.cpp | 9 + .../aggregation/AggregationHandleAvg.hpp | 4 + .../aggregation/AggregationHandleCount.cpp | 11 ++ .../aggregation/AggregationHandleCount.hpp | 4 + .../aggregation/AggregationHandleDistinct.hpp | 7 + .../aggregation/AggregationHandleMax.cpp | 9 + .../aggregation/AggregationHandleMax.hpp | 4 + .../aggregation/AggregationHandleMin.cpp | 9 + .../aggregation/AggregationHandleMin.hpp | 4 + .../aggregation/AggregationHandleSum.cpp | 9 + .../aggregation/AggregationHandleSum.hpp | 4 + expressions/aggregation/CMakeLists.txt | 2 + .../tests/AggregationHandleAvg_unittest.cpp | 109 ++++++++++++ .../tests/AggregationHandleCount_unittest.cpp | 126 +++++++++++++- .../tests/AggregationHandleMax_unittest.cpp | 122 ++++++++++++++ .../tests/AggregationHandleMin_unittest.cpp | 121 ++++++++++++++ .../tests/AggregationHandleSum_unittest.cpp | 124 ++++++++++++++ query_execution/QueryContext.hpp | 2 +- storage/AggregationOperationState.cpp | 84 ++++++++-- storage/AggregationOperationState.hpp | 4 + storage/CMakeLists.txt | 10 ++ storage/HashTablePool.hpp | 166 +++++++++++++++++++ storage/StorageManager.cpp | 3 +- 25 files changed, 1045 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index 52249f7..0267e17 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -44,6 +44,90 @@ class ValueAccessor; * @{ */ +/** + * @brief An upserter class for modifying the destination hash table while + * merging two group by hash tables. + **/ +template +class HashTableStateUpserter { + public: + /** + * @brief Constructor. + * + * @param handle The aggregation handle being used. + * @param source_state The aggregation state in the source aggregation hash + * table. The corresponding state (for the same key) in the destination + * hash table will be upserted. + **/ + HashTableStateUpserter(const HandleT &handle, const StateT &source_state) + : handle_(handle), source_state_(source_state) {} + + /** + * @brief The operator for the functor required for the upsert. + * + * @param destination_state The aggregation state in the aggregation hash + * table that is being upserted. + **/ + void operator()(StateT *destination_state) { + handle_.mergeStates(source_state_, destination_state); + } + + private: + const HandleT &handle_; + const StateT &source_state_; + + DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter); +}; + +/** + * @brief A class to support the functor for merging group by hash tables. + **/ +template +class HashTableMerger { + public: + /** + * @brief Constructor + * + * @param handle The Aggregation handle being used. + * @param destination_hash_table The destination hash table to which other + * hash tables will be merged. + **/ + HashTableMerger(const HandleT &handle, + AggregationStateHashTableBase *destination_hash_table) + : handle_(handle), + destination_hash_table_( + static_cast(destination_hash_table)) {} + + /** + * @brief The operator for the functor. + * + * @param group_by_key The group by key being merged. + * @param source_state The aggregation state for the given key in the source + * aggregation hash table. + **/ + inline void operator()(const std::vector &group_by_key, + const StateT &source_state) { + const StateT *original_state = + destination_hash_table_->getSingleCompositeKey(group_by_key); + if (original_state != nullptr) { + HashTableStateUpserter upserter( + handle_, source_state); + // The CHECK is required as upsertCompositeKey can return false if the + // hash table runs out of space during the upsert process. The ideal + // solution will be to retry again if the upsert fails. + CHECK(destination_hash_table_->upsertCompositeKey( + group_by_key, *original_state, &upserter)); + } else { + destination_hash_table_->putCompositeKey(group_by_key, source_state); + } + } + + private: + const HandleT &handle_; + HashTableT *destination_hash_table_; + + DISALLOW_COPY_AND_ASSIGN(HashTableMerger); +}; /** * @brief The helper intermediate subclass of AggregationHandle that provides @@ -140,6 +224,11 @@ class AggregationConcreteHandle : public AggregationHandle { return static_cast(this)->finalizeHashTableEntry(*group_state); } + template + void mergeGroupByHashTablesHelper( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const; + private: DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle); }; @@ -373,6 +462,22 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( } } +template +void AggregationConcreteHandle::mergeGroupByHashTablesHelper( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + const HandleT &handle = static_cast(*this); + const HashTableT &source_hash_table_concrete = + static_cast(source_hash_table); + + HashTableMerger merger(handle, + destination_hash_table); + + source_hash_table_concrete.forEachCompositeKey(&merger); +} + } // namespace quickstep #endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index 625f334..cdebb03 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -276,7 +276,7 @@ class AggregationHandle { * each GROUP BY group. Later, a second-round aggregation on the distinctify * hash table will be performed to actually compute the aggregated result for * each GROUP BY group. - * + * * In the case of single aggregation where there is no GROUP BY expressions, * we simply treat it as a special GROUP BY case that the GROUP BY expression * vector is empty. @@ -349,6 +349,19 @@ class AggregationHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const = 0; + /** + * @brief Merge two GROUP BY hash tables in one. + * + * @note Both the hash tables should have the same structure. + * + * @param source_hash_table The hash table which will get merged. + * @param destination_hash_table The hash table to which we will merge the + * other hash table. + **/ + virtual void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const = 0; + protected: AggregationHandle() { } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index cb0d63d..42a2fb9 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -203,4 +203,13 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( aggregation_hash_table); } +void AggregationHandleAvg::mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + mergeGroupByHashTablesHelper>( + source_hash_table, destination_hash_table); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index 6a94ee6..4ad4b21 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -158,6 +158,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override; + private: friend class AggregateFunctionAvg; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index 5ece8ba..964b7c2 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -206,6 +206,17 @@ void AggregationHandleCount aggregation_hash_table); } +template +void AggregationHandleCount::mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + mergeGroupByHashTablesHelper< + AggregationHandleCount, + AggregationStateCount, + AggregationStateHashTable>(source_hash_table, + destination_hash_table); +} + // Explicitly instantiate and compile in the different versions of // AggregationHandleCount we need. Note that we do not compile a version with // 'count_star == true' and 'nullable_type == true', as that combination is http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index 6bb4e65..50138b9 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -166,6 +166,10 @@ class AggregationHandleCount : public AggregationConcreteHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override; + private: friend class AggregateFunctionCount; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleDistinct.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp index 918fdf8..6342c2b 100644 --- a/expressions/aggregation/AggregationHandleDistinct.hpp +++ b/expressions/aggregation/AggregationHandleDistinct.hpp @@ -109,6 +109,13 @@ class AggregationHandleDistinct : public AggregationConcreteHandle { const AggregationStateHashTableBase &hash_table, std::vector> *group_by_keys) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override { + LOG(FATAL) + << "AggregationHandleDistinct does not support mergeGroupByHashTables"; + } + private: DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index 4703657..a7a4a52 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -139,4 +139,13 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( aggregation_hash_table); } +void AggregationHandleMax::mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + mergeGroupByHashTablesHelper>( + source_hash_table, destination_hash_table); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index 8932ef8..5af5a12 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -151,6 +151,10 @@ class AggregationHandleMax : public AggregationConcreteHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override; + private: friend class AggregateFunctionMax; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index de2709a..ca9b163 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -141,4 +141,13 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( aggregation_hash_table); } +void AggregationHandleMin::mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + mergeGroupByHashTablesHelper>( + source_hash_table, destination_hash_table); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 4e4c05d..f68bb9d 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -149,6 +149,10 @@ class AggregationHandleMin : public AggregationConcreteHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override; + private: friend class AggregateFunctionMin; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 14421d2..691ff39 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -190,4 +190,13 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( aggregation_hash_table); } +void AggregationHandleSum::mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + mergeGroupByHashTablesHelper>( + source_hash_table, destination_hash_table); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index b765243..fdc0884 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -148,6 +148,10 @@ class AggregationHandleSum : public AggregationConcreteHandle { const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const override; + void mergeGroupByHashTables( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const override; + private: friend class AggregateFunctionSum; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 26cec7f..416c4c6 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -291,6 +291,8 @@ target_link_libraries(AggregationHandle_tests quickstep_expressions_aggregation_AggregationHandleMin quickstep_expressions_aggregation_AggregationHandleSum quickstep_expressions_aggregation_AggregationID + quickstep_storage_HashTableBase + quickstep_storage_StorageManager quickstep_types_CharType quickstep_types_DateOperatorOverloads quickstep_types_DatetimeIntervalType http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp index d27b54e..fd82cba 100644 --- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp @@ -26,6 +26,7 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleAvg.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DateOperatorOverloads.hpp" #include "types/DatetimeIntervalType.hpp" @@ -238,6 +239,7 @@ class AggregationHandleAvgTest : public::testing::Test { std::unique_ptr aggregation_handle_avg_; std::unique_ptr aggregation_handle_avg_state_; + std::unique_ptr storage_manager_; }; const int AggregationHandleAvgTest::kNumSamples; @@ -417,4 +419,111 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); } +TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { + const Type &long_non_null_type = LongType::Instance(false); + initializeHandle(long_non_null_type); + storage_manager_.reset(new StorageManager("./test_avg_data")); + std::unique_ptr source_hash_table( + aggregation_handle_avg_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + std::unique_ptr destination_hash_table( + aggregation_handle_avg_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + + AggregationStateHashTable *destination_hash_table_derived = + static_cast *>( + destination_hash_table.get()); + + AggregationStateHashTable *source_hash_table_derived = + static_cast *>( + source_hash_table.get()); + + AggregationHandleAvg *aggregation_handle_avg_derived = + static_cast(aggregation_handle_avg_.get()); + // We create three keys: first is present in both the hash tables, second key + // is present only in the source hash table while the third key is present + // the destination hash table only. + std::vector common_key; + common_key.emplace_back(static_cast(0)); + std::vector exclusive_source_key, exclusive_destination_key; + exclusive_source_key.emplace_back(static_cast(1)); + exclusive_destination_key.emplace_back(static_cast(2)); + + const std::int64_t common_key_source_avg = 355; + TypedValue common_key_source_avg_val(common_key_source_avg); + + const std::int64_t common_key_destination_avg = 295; + TypedValue common_key_destination_avg_val(common_key_destination_avg); + + const std::int64_t exclusive_key_source_avg = 1; + TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg); + + const std::int64_t exclusive_key_destination_avg = 1; + TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg); + + std::unique_ptr common_key_source_state( + static_cast( + aggregation_handle_avg_->createInitialState())); + std::unique_ptr common_key_destination_state( + static_cast( + aggregation_handle_avg_->createInitialState())); + std::unique_ptr exclusive_key_source_state( + static_cast( + aggregation_handle_avg_->createInitialState())); + std::unique_ptr exclusive_key_destination_state( + static_cast( + aggregation_handle_avg_->createInitialState())); + + // Create avg value states for keys. + aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(), + common_key_source_avg_val); + + aggregation_handle_avg_derived->iterateUnaryInl( + common_key_destination_state.get(), common_key_destination_avg_val); + + aggregation_handle_avg_derived->iterateUnaryInl( + exclusive_key_destination_state.get(), exclusive_key_destination_avg_val); + + aggregation_handle_avg_derived->iterateUnaryInl( + exclusive_key_source_state.get(), exclusive_key_source_avg_val); + + // Add the key-state pairs to the hash tables. + source_hash_table_derived->putCompositeKey(common_key, + *common_key_source_state); + destination_hash_table_derived->putCompositeKey( + common_key, *common_key_destination_state); + source_hash_table_derived->putCompositeKey(exclusive_source_key, + *exclusive_key_source_state); + destination_hash_table_derived->putCompositeKey( + exclusive_destination_key, *exclusive_key_destination_state); + + EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); + EXPECT_EQ(2u, source_hash_table_derived->numEntries()); + + aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table, + destination_hash_table.get()); + + EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); + + CheckAvgValue( + (common_key_destination_avg_val.getLiteral() + + common_key_source_avg_val.getLiteral()) / static_cast(2), + *aggregation_handle_avg_derived, + *(destination_hash_table_derived->getSingleCompositeKey(common_key))); + CheckAvgValue(exclusive_key_destination_avg_val.getLiteral(), + *aggregation_handle_avg_derived, + *(destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key))); + CheckAvgValue(exclusive_key_source_avg_val.getLiteral(), + *aggregation_handle_avg_derived, + *(source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key))); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp index 7bebf6a..bf02523 100644 --- a/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleCount_unittest.cpp @@ -27,6 +27,7 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleCount.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DoubleType.hpp" #include "types/FloatType.hpp" @@ -355,6 +356,7 @@ class AggregationHandleCountTest : public::testing::Test { std::unique_ptr aggregation_handle_count_; std::unique_ptr aggregation_handle_count_state_; + std::unique_ptr storage_manager_; }; typedef AggregationHandleCountTest AggregationHandleCountDeathTest; @@ -477,5 +479,127 @@ TEST_F(AggregationHandleCountTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kLong)); } -} // namespace quickstep +TEST_F(AggregationHandleCountTest, GroupByTableMergeTestCount) { + const Type &long_non_null_type = LongType::Instance(false); + initializeHandle(&long_non_null_type); + storage_manager_.reset(new StorageManager("./test_count_data")); + std::unique_ptr source_hash_table( + aggregation_handle_count_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + std::unique_ptr destination_hash_table( + aggregation_handle_count_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + + AggregationStateHashTable *destination_hash_table_derived = + static_cast *>( + destination_hash_table.get()); + + AggregationStateHashTable *source_hash_table_derived = + static_cast *>( + source_hash_table.get()); + + // TODO(harshad) - Use TemplateUtil::CreateBoolInstantiatedInstance to + // generate all the combinations of the bool template arguments and test them. + AggregationHandleCount *aggregation_handle_count_derived = + static_cast *>( + aggregation_handle_count_.get()); + // We create three keys: first is present in both the hash tables, second key + // is present only in the source hash table while the third key is present + // the destination hash table only. + std::vector common_key; + common_key.emplace_back(static_cast(0)); + std::vector exclusive_source_key, exclusive_destination_key; + exclusive_source_key.emplace_back(static_cast(1)); + exclusive_destination_key.emplace_back(static_cast(2)); + + const std::int64_t common_key_source_count = 1; + TypedValue common_key_source_count_val(common_key_source_count); + + const std::int64_t common_key_destination_count = 1; + TypedValue common_key_destination_count_val(common_key_destination_count); + + const std::int64_t exclusive_key_source_count = 1; + TypedValue exclusive_key_source_count_val(exclusive_key_source_count); + + const std::int64_t exclusive_key_destination_count = 1; + TypedValue exclusive_key_destination_count_val(exclusive_key_destination_count); + + std::unique_ptr common_key_source_state( + static_cast( + aggregation_handle_count_->createInitialState())); + std::unique_ptr common_key_destination_state( + static_cast( + aggregation_handle_count_->createInitialState())); + std::unique_ptr exclusive_key_source_state( + static_cast( + aggregation_handle_count_->createInitialState())); + std::unique_ptr exclusive_key_destination_state( + static_cast( + aggregation_handle_count_->createInitialState())); + + // Create count value states for keys. + aggregation_handle_count_derived->iterateUnaryInl(common_key_source_state.get(), + common_key_source_count_val); + std::int64_t actual_val = aggregation_handle_count_->finalize(*common_key_source_state) + .getLiteral(); + EXPECT_EQ(common_key_source_count_val.getLiteral(), actual_val); + + aggregation_handle_count_derived->iterateUnaryInl( + common_key_destination_state.get(), common_key_destination_count_val); + actual_val = aggregation_handle_count_->finalize(*common_key_destination_state) + .getLiteral(); + EXPECT_EQ(common_key_destination_count_val.getLiteral(), actual_val); + + aggregation_handle_count_derived->iterateUnaryInl( + exclusive_key_destination_state.get(), exclusive_key_destination_count_val); + actual_val = + aggregation_handle_count_->finalize(*exclusive_key_destination_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_destination_count_val.getLiteral(), actual_val); + + aggregation_handle_count_derived->iterateUnaryInl( + exclusive_key_source_state.get(), exclusive_key_source_count_val); + actual_val = aggregation_handle_count_->finalize(*exclusive_key_source_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_source_count_val.getLiteral(), actual_val); + + // Add the key-state pairs to the hash tables. + source_hash_table_derived->putCompositeKey(common_key, + *common_key_source_state); + destination_hash_table_derived->putCompositeKey( + common_key, *common_key_destination_state); + source_hash_table_derived->putCompositeKey(exclusive_source_key, + *exclusive_key_source_state); + destination_hash_table_derived->putCompositeKey( + exclusive_destination_key, *exclusive_key_destination_state); + + EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); + EXPECT_EQ(2u, source_hash_table_derived->numEntries()); + + aggregation_handle_count_->mergeGroupByHashTables(*source_hash_table, + destination_hash_table.get()); + + EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); + + CheckCountValue( + common_key_destination_count_val.getLiteral() + + common_key_source_count_val.getLiteral(), + *aggregation_handle_count_derived, + *(destination_hash_table_derived->getSingleCompositeKey(common_key))); + CheckCountValue(exclusive_key_destination_count_val.getLiteral(), + *aggregation_handle_count_derived, + *(destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key))); + CheckCountValue(exclusive_key_source_count_val.getLiteral(), + *aggregation_handle_count_derived, + *(source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key))); +} +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp index 027f24b..fc25e91 100644 --- a/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleMax_unittest.cpp @@ -29,6 +29,8 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleMax.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DatetimeLit.hpp" @@ -413,6 +415,7 @@ class AggregationHandleMaxTest : public ::testing::Test { std::unique_ptr aggregation_handle_max_; std::unique_ptr aggregation_handle_max_state_; + std::unique_ptr storage_manager_; }; template <> @@ -637,4 +640,123 @@ TEST_F(AggregationHandleMaxTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble)); } +TEST_F(AggregationHandleMaxTest, GroupByTableMergeTest) { + const Type &int_non_null_type = IntType::Instance(false); + initializeHandle(int_non_null_type); + storage_manager_.reset(new StorageManager("./test_max_data")); + std::unique_ptr source_hash_table( + aggregation_handle_max_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &int_non_null_type), + 10, + storage_manager_.get())); + std::unique_ptr destination_hash_table( + aggregation_handle_max_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &int_non_null_type), + 10, + storage_manager_.get())); + + AggregationStateHashTable *destination_hash_table_derived = + static_cast *>( + destination_hash_table.get()); + + AggregationStateHashTable *source_hash_table_derived = + static_cast *>( + source_hash_table.get()); + + AggregationHandleMax *aggregation_handle_max_derived = + static_cast(aggregation_handle_max_.get()); + // We create three keys: first is present in both the hash tables, second key + // is present only in the source hash table while the third key is present + // the destination hash table only. + std::vector common_key; + common_key.emplace_back(0); + std::vector exclusive_source_key, exclusive_destination_key; + exclusive_source_key.emplace_back(1); + exclusive_destination_key.emplace_back(2); + + const int common_key_source_max = 3000; + TypedValue common_key_source_max_val(common_key_source_max); + + const int common_key_destination_max = 4000; + TypedValue common_key_destination_max_val(common_key_destination_max); + + const int exclusive_key_source_max = 100; + TypedValue exclusive_key_source_max_val(exclusive_key_source_max); + + const int exclusive_key_destination_max = 200; + TypedValue exclusive_key_destination_max_val(exclusive_key_destination_max); + + std::unique_ptr common_key_source_state( + static_cast( + aggregation_handle_max_->createInitialState())); + std::unique_ptr common_key_destination_state( + static_cast( + aggregation_handle_max_->createInitialState())); + std::unique_ptr exclusive_key_source_state( + static_cast( + aggregation_handle_max_->createInitialState())); + std::unique_ptr exclusive_key_destination_state( + static_cast( + aggregation_handle_max_->createInitialState())); + + // Create max value states for keys. + aggregation_handle_max_derived->iterateUnaryInl(common_key_source_state.get(), + common_key_source_max_val); + int actual_val = aggregation_handle_max_->finalize(*common_key_source_state) + .getLiteral(); + EXPECT_EQ(common_key_source_max_val.getLiteral(), actual_val); + + aggregation_handle_max_derived->iterateUnaryInl( + common_key_destination_state.get(), common_key_destination_max_val); + actual_val = aggregation_handle_max_->finalize(*common_key_destination_state) + .getLiteral(); + EXPECT_EQ(common_key_destination_max_val.getLiteral(), actual_val); + + aggregation_handle_max_derived->iterateUnaryInl( + exclusive_key_destination_state.get(), exclusive_key_destination_max_val); + actual_val = + aggregation_handle_max_->finalize(*exclusive_key_destination_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_destination_max_val.getLiteral(), actual_val); + + aggregation_handle_max_derived->iterateUnaryInl( + exclusive_key_source_state.get(), exclusive_key_source_max_val); + actual_val = aggregation_handle_max_->finalize(*exclusive_key_source_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_source_max_val.getLiteral(), actual_val); + + // Add the key-state pairs to the hash tables. + source_hash_table_derived->putCompositeKey(common_key, + *common_key_source_state); + destination_hash_table_derived->putCompositeKey( + common_key, *common_key_destination_state); + source_hash_table_derived->putCompositeKey(exclusive_source_key, + *exclusive_key_source_state); + destination_hash_table_derived->putCompositeKey( + exclusive_destination_key, *exclusive_key_destination_state); + + EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); + EXPECT_EQ(2u, source_hash_table_derived->numEntries()); + + aggregation_handle_max_->mergeGroupByHashTables(*source_hash_table, + destination_hash_table.get()); + + EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); + + CheckMaxValue( + common_key_destination_max_val.getLiteral(), + *aggregation_handle_max_derived, + *(destination_hash_table_derived->getSingleCompositeKey(common_key))); + CheckMaxValue(exclusive_key_destination_max_val.getLiteral(), + *aggregation_handle_max_derived, + *(destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key))); + CheckMaxValue(exclusive_key_source_max_val.getLiteral(), + *aggregation_handle_max_derived, + *(source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key))); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp index eb64472..a87ace9 100644 --- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp @@ -29,6 +29,7 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleMin.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DatetimeLit.hpp" @@ -411,6 +412,7 @@ class AggregationHandleMinTest : public ::testing::Test { std::unique_ptr aggregation_handle_min_; std::unique_ptr aggregation_handle_min_state_; + std::unique_ptr storage_manager_; }; template <> @@ -634,4 +636,123 @@ TEST_F(AggregationHandleMinTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble)); } +TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) { + const Type &int_non_null_type = IntType::Instance(false); + initializeHandle(int_non_null_type); + storage_manager_.reset(new StorageManager("./test_min_data")); + std::unique_ptr source_hash_table( + aggregation_handle_min_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &int_non_null_type), + 10, + storage_manager_.get())); + std::unique_ptr destination_hash_table( + aggregation_handle_min_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &int_non_null_type), + 10, + storage_manager_.get())); + + AggregationStateHashTable *destination_hash_table_derived = + static_cast *>( + destination_hash_table.get()); + + AggregationStateHashTable *source_hash_table_derived = + static_cast *>( + source_hash_table.get()); + + AggregationHandleMin *aggregation_handle_min_derived = + static_cast(aggregation_handle_min_.get()); + // We create three keys: first is present in both the hash tables, second key + // is present only in the source hash table while the third key is present + // the destination hash table only. + std::vector common_key; + common_key.emplace_back(0); + std::vector exclusive_source_key, exclusive_destination_key; + exclusive_source_key.emplace_back(1); + exclusive_destination_key.emplace_back(2); + + const int common_key_source_min = 3000; + TypedValue common_key_source_min_val(common_key_source_min); + + const int common_key_destination_min = 4000; + TypedValue common_key_destination_min_val(common_key_destination_min); + + const int exclusive_key_source_min = 100; + TypedValue exclusive_key_source_min_val(exclusive_key_source_min); + + const int exclusive_key_destination_min = 200; + TypedValue exclusive_key_destination_min_val(exclusive_key_destination_min); + + std::unique_ptr common_key_source_state( + static_cast( + aggregation_handle_min_->createInitialState())); + std::unique_ptr common_key_destination_state( + static_cast( + aggregation_handle_min_->createInitialState())); + std::unique_ptr exclusive_key_source_state( + static_cast( + aggregation_handle_min_->createInitialState())); + std::unique_ptr exclusive_key_destination_state( + static_cast( + aggregation_handle_min_->createInitialState())); + + // Create min value states for keys. + aggregation_handle_min_derived->iterateUnaryInl(common_key_source_state.get(), + common_key_source_min_val); + int actual_val = aggregation_handle_min_->finalize(*common_key_source_state) + .getLiteral(); + EXPECT_EQ(common_key_source_min_val.getLiteral(), actual_val); + + aggregation_handle_min_derived->iterateUnaryInl( + common_key_destination_state.get(), common_key_destination_min_val); + actual_val = aggregation_handle_min_->finalize(*common_key_destination_state) + .getLiteral(); + EXPECT_EQ(common_key_destination_min_val.getLiteral(), actual_val); + + aggregation_handle_min_derived->iterateUnaryInl( + exclusive_key_destination_state.get(), exclusive_key_destination_min_val); + actual_val = + aggregation_handle_min_->finalize(*exclusive_key_destination_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_destination_min_val.getLiteral(), actual_val); + + aggregation_handle_min_derived->iterateUnaryInl( + exclusive_key_source_state.get(), exclusive_key_source_min_val); + actual_val = aggregation_handle_min_->finalize(*exclusive_key_source_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_source_min_val.getLiteral(), actual_val); + + // Add the key-state pairs to the hash tables. + source_hash_table_derived->putCompositeKey(common_key, + *common_key_source_state); + destination_hash_table_derived->putCompositeKey( + common_key, *common_key_destination_state); + source_hash_table_derived->putCompositeKey(exclusive_source_key, + *exclusive_key_source_state); + destination_hash_table_derived->putCompositeKey( + exclusive_destination_key, *exclusive_key_destination_state); + + EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); + EXPECT_EQ(2u, source_hash_table_derived->numEntries()); + + aggregation_handle_min_->mergeGroupByHashTables(*source_hash_table, + destination_hash_table.get()); + + EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); + + CheckMinValue( + common_key_source_min_val.getLiteral(), + *aggregation_handle_min_derived, + *(destination_hash_table_derived->getSingleCompositeKey(common_key))); + CheckMinValue(exclusive_key_destination_min_val.getLiteral(), + *aggregation_handle_min_derived, + *(destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key))); + CheckMinValue(exclusive_key_source_min_val.getLiteral(), + *aggregation_handle_min_derived, + *(source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key))); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp index 7dbbeb3..abf8a89 100644 --- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp @@ -26,6 +26,7 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleSum.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DoubleType.hpp" @@ -237,6 +238,7 @@ class AggregationHandleSumTest : public::testing::Test { std::unique_ptr aggregation_handle_sum_; std::unique_ptr aggregation_handle_sum_state_; + std::unique_ptr storage_manager_; }; const int AggregationHandleSumTest::kNumSamples; @@ -425,4 +427,126 @@ TEST_F(AggregationHandleSumTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); } +TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { + const Type &long_non_null_type = LongType::Instance(false); + initializeHandle(long_non_null_type); + storage_manager_.reset(new StorageManager("./test_sum_data")); + std::unique_ptr source_hash_table( + aggregation_handle_sum_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + std::unique_ptr destination_hash_table( + aggregation_handle_sum_->createGroupByHashTable( + HashTableImplType::kSimpleScalarSeparateChaining, + std::vector(1, &long_non_null_type), + 10, + storage_manager_.get())); + + AggregationStateHashTable *destination_hash_table_derived = + static_cast *>( + destination_hash_table.get()); + + AggregationStateHashTable *source_hash_table_derived = + static_cast *>( + source_hash_table.get()); + + AggregationHandleSum *aggregation_handle_sum_derived = + static_cast(aggregation_handle_sum_.get()); + // We create three keys: first is present in both the hash tables, second key + // is present only in the source hash table while the third key is present + // the destination hash table only. + std::vector common_key; + common_key.emplace_back(static_cast(0)); + std::vector exclusive_source_key, exclusive_destination_key; + exclusive_source_key.emplace_back(static_cast(1)); + exclusive_destination_key.emplace_back(static_cast(2)); + + const std::int64_t common_key_source_sum = 3000; + TypedValue common_key_source_sum_val(common_key_source_sum); + + const std::int64_t common_key_destination_sum = 4000; + TypedValue common_key_destination_sum_val(common_key_destination_sum); + + const std::int64_t merged_common_key = common_key_source_sum + common_key_destination_sum; + TypedValue common_key_merged_val(merged_common_key); + + const std::int64_t exclusive_key_source_sum = 100; + TypedValue exclusive_key_source_sum_val(exclusive_key_source_sum); + + const std::int64_t exclusive_key_destination_sum = 200; + TypedValue exclusive_key_destination_sum_val(exclusive_key_destination_sum); + + std::unique_ptr common_key_source_state( + static_cast( + aggregation_handle_sum_->createInitialState())); + std::unique_ptr common_key_destination_state( + static_cast( + aggregation_handle_sum_->createInitialState())); + std::unique_ptr exclusive_key_source_state( + static_cast( + aggregation_handle_sum_->createInitialState())); + std::unique_ptr exclusive_key_destination_state( + static_cast( + aggregation_handle_sum_->createInitialState())); + + // Create sum value states for keys. + aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(), + common_key_source_sum_val); + std::int64_t actual_val = aggregation_handle_sum_->finalize(*common_key_source_state) + .getLiteral(); + EXPECT_EQ(common_key_source_sum_val.getLiteral(), actual_val); + + aggregation_handle_sum_derived->iterateUnaryInl( + common_key_destination_state.get(), common_key_destination_sum_val); + actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state) + .getLiteral(); + EXPECT_EQ(common_key_destination_sum_val.getLiteral(), actual_val); + + aggregation_handle_sum_derived->iterateUnaryInl( + exclusive_key_destination_state.get(), exclusive_key_destination_sum_val); + actual_val = + aggregation_handle_sum_->finalize(*exclusive_key_destination_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral(), actual_val); + + aggregation_handle_sum_derived->iterateUnaryInl( + exclusive_key_source_state.get(), exclusive_key_source_sum_val); + actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state) + .getLiteral(); + EXPECT_EQ(exclusive_key_source_sum_val.getLiteral(), actual_val); + + // Add the key-state pairs to the hash tables. + source_hash_table_derived->putCompositeKey(common_key, + *common_key_source_state); + destination_hash_table_derived->putCompositeKey( + common_key, *common_key_destination_state); + source_hash_table_derived->putCompositeKey(exclusive_source_key, + *exclusive_key_source_state); + destination_hash_table_derived->putCompositeKey( + exclusive_destination_key, *exclusive_key_destination_state); + + EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); + EXPECT_EQ(2u, source_hash_table_derived->numEntries()); + + aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table, + destination_hash_table.get()); + + EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); + + CheckSumValue( + common_key_merged_val.getLiteral(), + *aggregation_handle_sum_derived, + *(destination_hash_table_derived->getSingleCompositeKey(common_key))); + CheckSumValue(exclusive_key_destination_sum_val.getLiteral(), + *aggregation_handle_sum_derived, + *(destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key))); + CheckSumValue(exclusive_key_source_sum_val.getLiteral(), + *aggregation_handle_sum_derived, + *(source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key))); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 9440fae..7d5628d 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -216,7 +216,7 @@ class QueryContext { * * @param id The BloomFilter id. * - * @return The constant pointer to BloomFilter that is + * @return The constant pointer to BloomFilter that is * already created in the constructor. **/ inline const BloomFilter* getBloomFilter(const bloom_filter_id id) const { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index d209ceb..4878cf1 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -92,11 +92,12 @@ AggregationOperationState::AggregationOperationState( arguments_.push_back({}); is_distinct_.emplace_back(false); - group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable( - hash_table_impl_type, - group_by_types, - estimated_num_entries, - storage_manager_)); + group_by_hashtable_pools_.emplace_back(std::unique_ptr( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + handles_.back().get(), + storage_manager))); } else { // Set up each individual aggregate in this operation. std::vector::const_iterator agg_func_it @@ -124,12 +125,13 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); if (!group_by_list_.empty()) { - // Aggregation with GROUP BY: create a HashTable for per-group states. - group_by_hashtables_.emplace_back(handles_.back()->createGroupByHashTable( - hash_table_impl_type, - group_by_types, - estimated_num_entries, - storage_manager_)); + // Aggregation with GROUP BY: create a HashTable pool for per-group states. + group_by_hashtable_pools_.emplace_back(std::unique_ptr( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + handles_.back().get(), + storage_manager))); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); @@ -408,17 +410,17 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // Call StorageBlock::aggregateGroupBy() to aggregate this block's values // directly into the (threadsafe) shared global HashTable for this // aggregate. - // - // TODO(shoban): Implement optional code path for using local hash table per - // block, which can be merged with global hash table for all blocks - // aggregated on. + DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); + AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); + DCHECK(agg_hash_table != nullptr); block->aggregateGroupBy(*handles_[agg_idx], arguments_[agg_idx], group_by_list_, predicate_.get(), - group_by_hashtables_[agg_idx].get(), + agg_hash_table, &reuse_matches, &reuse_group_by_vectors); + group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table); } } } @@ -447,19 +449,65 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest // group (which is also the prefix of the finalized Tuple for that group). std::vector> group_by_keys; + // TODO(harshad) - The merge phase may be slower when each hash table contains + // large number of entries. We should find ways in which we can perform a + // parallel merge. + + // TODO(harshad) - Find heuristics for faster merge, even in a single thread. + // e.g. Keep merging entries from smaller hash tables to larger. + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + if (hash_tables->size() > 1) { + for (int hash_table_index = 0; + hash_table_index < static_cast(hash_tables->size() - 1); + ++hash_table_index) { + // Merge each hash table to the last hash table. + handles_[agg_idx]->mergeGroupByHashTables( + (*(*hash_tables)[hash_table_index]), + hash_tables->back().get()); + } + } + } + // Collect per-aggregate finalized values. std::vector> final_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { + DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); + auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + DCHECK(hash_tables != nullptr); + if (hash_tables->empty()) { + // We may have a case where hash_tables is empty, e.g. no input blocks. + // However for aggregateOnDistinctifyHashTableForGroupBy to work + // correctly, we should create an empty group by hash table. + AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); + group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + } + DCHECK(hash_tables->back() != nullptr); + AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); + DCHECK(agg_hash_table != nullptr); handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( *distinctify_hashtables_[agg_idx], - group_by_hashtables_[agg_idx].get()); + agg_hash_table); } + auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + DCHECK(hash_tables != nullptr); + if (hash_tables->empty()) { + // We may have a case where hash_tables is empty, e.g. no input blocks. + // However for aggregateOnDistinctifyHashTableForGroupBy to work + // correctly, we should create an empty group by hash table. + AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); + group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + } + AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); + DCHECK(agg_hash_table != nullptr); ColumnVector* agg_result_col = - handles_[agg_idx]->finalizeHashTable(*group_by_hashtables_[agg_idx], + handles_[agg_idx]->finalizeHashTable(*agg_hash_table, &group_by_keys); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index c3a1278..0199749 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -31,6 +31,7 @@ #include "expressions/scalar/Scalar.hpp" #include "storage/AggregationOperationState.pb.h" #include "storage/HashTableBase.hpp" +#include "storage/HashTablePool.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" @@ -209,6 +210,9 @@ class AggregationOperationState { // hash table to prevent multiple lookups. std::vector> group_by_hashtables_; + // A vector of group by hash table pools, one for each group by clause. + std::vector> group_by_hashtable_pools_; + StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index a3093df..87a5e54 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -187,6 +187,7 @@ add_library(quickstep_storage_HashTable_proto ${storage_HashTable_proto_srcs}) add_library(quickstep_storage_HashTableBase ../empty_src.cpp HashTableBase.hpp) add_library(quickstep_storage_HashTableFactory HashTableFactory.cpp HashTableFactory.hpp) add_library(quickstep_storage_HashTableKeyManager ../empty_src.cpp HashTableKeyManager.hpp) +add_library(quickstep_storage_HashTablePool ../empty_src.cpp HashTablePool.hpp) add_library(quickstep_storage_IndexSubBlock ../empty_src.cpp IndexSubBlock.hpp) add_library(quickstep_storage_IndexSubBlockDescriptionFactory ../empty_src.cpp IndexSubBlockDescriptionFactory.hpp) add_library(quickstep_storage_InsertDestination InsertDestination.cpp InsertDestination.hpp) @@ -252,6 +253,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory + quickstep_storage_HashTablePool quickstep_storage_InsertDestination quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo @@ -662,6 +664,13 @@ target_link_libraries(quickstep_storage_HashTableKeyManager quickstep_types_TypedValue quickstep_types_operations_comparisons_ComparisonUtil quickstep_utility_Macros) +target_link_libraries(quickstep_storage_HashTablePool + glog + quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_HashTableBase + quickstep_threading_SpinMutex + quickstep_utility_Macros + quickstep_utility_StringUtil) target_link_libraries(quickstep_storage_IndexSubBlock quickstep_catalog_CatalogTypedefs quickstep_expressions_predicate_PredicateCost @@ -1012,6 +1021,7 @@ target_link_libraries(quickstep_storage quickstep_storage_HashTableBase quickstep_storage_HashTableFactory quickstep_storage_HashTableKeyManager + quickstep_storage_HashTablePool quickstep_storage_IndexSubBlock quickstep_storage_IndexSubBlockDescriptionFactory quickstep_storage_InsertDestination http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/HashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp new file mode 100644 index 0000000..c16d0f1 --- /dev/null +++ b/storage/HashTablePool.hpp @@ -0,0 +1,166 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_ +#define QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_ + +#include +#include +#include +#include + +#include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/HashTableBase.hpp" +#include "threading/SpinMutex.hpp" +#include "utility/Macros.hpp" +#include "utility/StringUtil.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class StorageManager; +class Type; + +/** \addtogroup Storage + * @{ + */ + +/** + * @brief A pool of HashTables used for a single aggregation handle. This class + * has similar functionality as InsertDestination, but for checking out + * HashTables. A worker thread can check out a hash table for insertion, + * perform the insertions and return the hash table to the pool. While + * one thread is using a hash table, no other thread can access it. + **/ +class HashTablePool { + public: + /** + * @brief Constructor. + * + * @param estimated_num_entries The maximum number of entries in a hash table. + * @param hash_table_impl_type The type of hash table implementation. + * @param group_by_types A vector of pointer of types which form the group by + * key. + * @param agg_handle The aggregation handle. + * @param storage_manager A pointer to the storage manager. + * + * @note The estimate of number of entries is quite inaccurate at this time. + * If we go by the current estimate, each hash table demands much + * larger space than it actually needs, which causes the system to + * either trigger evictions or worse - run out of memory. To fix this + * issue, we divide the estimate by 100. The division will not affect + * correctness, however it may allocate some hash tables smaller space + * than their requirement, causing them to be resized during build + * phase, which has a performance penalty. + **/ + HashTablePool(const std::size_t estimated_num_entries, + const HashTableImplType hash_table_impl_type, + const std::vector &group_by_types, + AggregationHandle *agg_handle, + StorageManager *storage_manager) + : estimated_num_entries_(reduceEstimatedCardinality(estimated_num_entries)), + hash_table_impl_type_(hash_table_impl_type), + group_by_types_(group_by_types), + agg_handle_(DCHECK_NOTNULL(agg_handle)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + /** + * @brief Check out a hash table for insertion. + * + * @return A hash table pointer. + **/ + AggregationStateHashTableBase* getHashTable() { + { + SpinMutexLock lock(mutex_); + if (!hash_tables_.empty()) { + std::unique_ptr ret_hash_table( + std::move(hash_tables_.back())); + hash_tables_.pop_back(); + DCHECK(ret_hash_table != nullptr); + return ret_hash_table.release(); + } + } + return createNewHashTable(); + } + + /** + * @brief Return a previously checked out hash table. + * + * @param hash_table A pointer to the checked out hash table. + **/ + void returnHashTable(AggregationStateHashTableBase *hash_table) { + SpinMutexLock lock(mutex_); + hash_tables_.push_back( + std::unique_ptr(hash_table)); + } + + /** + * @brief Get all the hash tables from the pool. + * + * @warning The caller should ensure that this call is made when no hash table + * is being checked in or checked out from the pool. In other words + * the hash table pool is in read-only state. + * + * @param All the hash tables in the pool. + * + **/ + const std::vector>* + getAllHashTables() { + return &hash_tables_; + } + + private: + AggregationStateHashTableBase* createNewHashTable() { + return agg_handle_->createGroupByHashTable(hash_table_impl_type_, + group_by_types_, + estimated_num_entries_, + storage_manager_); + } + + inline std::size_t reduceEstimatedCardinality( + const std::size_t original_estimate) const { + if (original_estimate < kEstimateReductionFactor) { + return original_estimate; + } else { + DCHECK_GT(kEstimateReductionFactor, 0u); + return original_estimate / kEstimateReductionFactor; + } + } + + static constexpr std::size_t kEstimateReductionFactor = 100; + + std::vector> hash_tables_; + + const std::size_t estimated_num_entries_; + const HashTableImplType hash_table_impl_type_; + + const std::vector group_by_types_; + + AggregationHandle *agg_handle_; + StorageManager *storage_manager_; + + SpinMutex mutex_; + + DISALLOW_COPY_AND_ASSIGN(HashTablePool); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2ddb67bf/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index dfc95b8..5d91052 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -183,7 +183,8 @@ StorageManager::~StorageManager() { it != blocks_.end(); ++it) { if (it->second.block->isDirty()) { - LOG(WARNING) << "Block with ID " << BlockIdUtil::ToString(it->first) + LOG(WARNING) << (it->second.block->isBlob() ? "Blob " : "Block ") + << "with ID " << BlockIdUtil::ToString(it->first) << " is dirty during StorageManager shutdown"; } delete it->second.block;