Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DA47B200B3C for ; Wed, 13 Jul 2016 23:27:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D6653160A6A; Wed, 13 Jul 2016 21:27:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3AAE1160A62 for ; Wed, 13 Jul 2016 23:27:03 +0200 (CEST) Received: (qmail 51300 invoked by uid 500); 13 Jul 2016 21:27:02 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 51288 invoked by uid 99); 13 Jul 2016 21:27:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2016 21:27:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D5495C06F1 for ; Wed, 13 Jul 2016 21:27:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id OK9zqZCcaWEJ for ; Wed, 13 Jul 2016 21:26:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 7CF555F476 for ; Wed, 13 Jul 2016 21:26:55 +0000 (UTC) Received: (qmail 51228 invoked by uid 99); 13 Jul 2016 21:26:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jul 2016 21:26:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42FA5E04BE; Wed, 13 Jul 2016 21:26:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shixuan@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <97bce58a5bbb44c78c441c11ea6e918a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Removed finalize() Date: Wed, 13 Jul 2016 21:26:54 +0000 (UTC) archived-at: Wed, 13 Jul 2016 21:27:05 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/SQL-window-aggregation c50ce5139 -> 0f5c41d15 Removed finalize() Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0f5c41d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0f5c41d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0f5c41d1 Branch: refs/heads/SQL-window-aggregation Commit: 0f5c41d1531032065e747b642b9013802f74ac06 Parents: c50ce51 Author: shixuan-fan Authored: Wed Jul 13 21:26:29 2016 +0000 Committer: shixuan-fan Committed: Wed Jul 13 21:26:29 2016 +0000 ---------------------------------------------------------------------- .../WindowAggregationHandle.hpp | 16 +- .../WindowAggregationHandleAvg.cpp | 47 +- .../WindowAggregationHandleAvg.hpp | 18 +- .../WindowAggregationHandleAvg_unittest.cpp | 600 +++---------------- storage/WindowAggregationOperationState.cpp | 19 +- 5 files changed, 140 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp index 8511b9e..831bcbf 100644 --- a/expressions/window_aggregation/WindowAggregationHandle.hpp +++ b/expressions/window_aggregation/WindowAggregationHandle.hpp @@ -102,14 +102,12 @@ class WindowAggregationHandle { * NULL if all arguments are attributes. * @param output_destination The destination for output. **/ - virtual void calculate(ColumnVectorsValueAccessor* block_accessors, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) = 0; - - virtual ValueAccessor* finalize() = 0; + virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors, + std::vector &&arguments, + const std::vector &partition_by_ids, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) = 0; protected: /** @@ -133,8 +131,6 @@ class WindowAggregationHandle { } } - std::unique_ptr tuple_accessor_; - std::unique_ptr window_aggregates_; const CatalogRelationSchema &relation_; std::vector> equal_comparators_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp index 7daaddf..14fc1d9 100644 --- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp +++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp @@ -88,22 +88,21 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg( .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble))); } -void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_accessor, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) { +ColumnVector* WindowAggregationHandleAvg::calculate( + ColumnVectorsValueAccessor *tuple_accessor, + std::vector &&arguments, + const std::vector &partition_by_ids, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) { DCHECK(arguments.size() == 1); DCHECK(arguments[0]->isNative()); DCHECK(static_cast(tuple_accessor->getNumTuples()) == static_cast(arguments[0])->size()); - - tuple_accessor_.reset(tuple_accessor); // Initialize the output column and argument accessor. - window_aggregates_.reset( - new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples())); + NativeColumnVector *window_aggregates = + new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples()); ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor(); argument_accessor->addColumn(arguments[0]); @@ -111,22 +110,21 @@ void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_acc tuple_accessor->beginIteration(); argument_accessor->beginIteration(); - while (tuple_accessor_->next() && argument_accessor->next()) { - const TypedValue window_aggregate = this->calculateOneWindow(argument_accessor, + while (tuple_accessor->next() && argument_accessor->next()) { + const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor, + argument_accessor, partition_by_ids, is_row, num_preceding, num_following); - window_aggregates_->appendTypedValue(window_aggregate); + window_aggregates->appendTypedValue(window_aggregate); } -} -ValueAccessor* WindowAggregationHandleAvg::finalize() { - tuple_accessor_->addColumn(window_aggregates_.release()); - return tuple_accessor_.get(); + return window_aggregates; } TypedValue WindowAggregationHandleAvg::calculateOneWindow( + ColumnVectorsValueAccessor *tuple_accessor, ColumnVectorsValueAccessor *argument_accessor, const std::vector &partition_by_ids, const bool is_row, @@ -149,11 +147,11 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow( std::vector current_row_partition_key; for (attribute_id partition_by_id : partition_by_ids) { current_row_partition_key.push_back( - tuple_accessor_->getTypedValue(partition_by_id)); + tuple_accessor->getTypedValue(partition_by_id)); } // Get current position. - tuple_id current_tuple_id = tuple_accessor_->getCurrentPositionVirtual(); + tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual(); // Find preceding tuples. int count_preceding = 0; @@ -168,7 +166,8 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow( // Get the partition keys and compare. If not the same partition as the // current row, stop searching preceding tuples. - if (!samePartition(current_row_partition_key, + if (!samePartition(tuple_accessor, + current_row_partition_key, preceding_tuple_id, partition_by_ids)) { break; @@ -196,13 +195,14 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow( following_tuple_id++; // No more following tuples. - if (following_tuple_id == tuple_accessor_->getNumTuples()) { + if (following_tuple_id == tuple_accessor->getNumTuples()) { break; } // Get the partition keys and compare. If not the same partition as the // current row, stop searching preceding tuples. - if (!samePartition(current_row_partition_key, + if (!samePartition(tuple_accessor, + current_row_partition_key, following_tuple_id, partition_by_ids)) { break; @@ -229,6 +229,7 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow( } bool WindowAggregationHandleAvg::samePartition( + const ColumnVectorsValueAccessor *tuple_accessor, const std::vector ¤t_row_partition_key, const tuple_id boundary_tuple_id, const std::vector &partition_by_ids) const { @@ -237,7 +238,7 @@ bool WindowAggregationHandleAvg::samePartition( ++partition_by_index) { if (!equal_comparators_[partition_by_index]->compareTypedValues( current_row_partition_key[partition_by_index], - tuple_accessor_->getTypedValueAtAbsolutePosition( + tuple_accessor->getTypedValueAtAbsolutePosition( partition_by_ids[partition_by_index], boundary_tuple_id))) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp index 4eb0846..72076fa 100644 --- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp +++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp @@ -55,14 +55,12 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle { public: ~WindowAggregationHandleAvg() override {} - void calculate(ColumnVectorsValueAccessor* block_accessors, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following); - - ValueAccessor* finalize() override; + ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors, + std::vector &&arguments, + const std::vector &partition_by_ids, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following); private: friend class WindowAggregateFunctionAvg; @@ -83,13 +81,15 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle { const Type &type); TypedValue calculateOneWindow( + ColumnVectorsValueAccessor *tuple_accessor, ColumnVectorsValueAccessor *argument_accessor, const std::vector &partition_by_ids, const bool is_row, const std::int64_t num_preceding, const std::int64_t num_following) const; - bool samePartition(const std::vector ¤t_row_partition_key, + bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor, + const std::vector ¤t_row_partition_key, const tuple_id boundary_tuple_id, const std::vector &partition_by_ids) const; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp index 8fd3c8a..58c8019 100644 --- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp +++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp @@ -28,7 +28,6 @@ #include "expressions/window_aggregation/WindowAggregationHandle.hpp" #include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp" #include "expressions/window_aggregation/WindowAggregationID.hpp" -#include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DateOperatorOverloads.hpp" #include "types/DatetimeIntervalType.hpp" @@ -51,141 +50,15 @@ namespace quickstep { namespace { - constexpr int kNumTuplesPerBlock = 100; - constexpr int kNumBlocks = 5; + constexpr int kNumTuples = 100; constexpr int kNumTuplesPerPartition = 8; + constexpr int kNullInterval = 25; } // namespace // Attribute value could be null if set true. class WindowAggregationHandleAvgTest : public::testing::TestWithParam { - protected: - virtual void SetUp() { - // Initialize relation and storage manager. - relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId)); - storage_manager_.reset(new StorageManager("TestAvg")); - - // Add All kinds of TypedValues. - CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(), - "int_attr", - TypeFactory::GetType(kInt, GetParam())); - - relation_->addAttribute(int_attr); - - CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(), - "float_attr", - TypeFactory::GetType(kFloat, GetParam())); - relation_->addAttribute(float_attr); - - CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(), - "long_attr", - TypeFactory::GetType(kLong, GetParam())); - relation_->addAttribute(long_attr); - - CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(), - "double_attr", - TypeFactory::GetType(kDouble, GetParam())); - relation_->addAttribute(double_attr); - - CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(), - "char_attr", - TypeFactory::GetType(kChar, 4, GetParam())); - relation_->addAttribute(char_attr); - - CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(), - "varchar_attr", - TypeFactory::GetType(kVarChar, 32, GetParam())); - relation_->addAttribute(varchar_attr); - - // Records the 'base_value' of a tuple used in createSampleTuple. - CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(), - "partition_value", - TypeFactory::GetType(kInt, false)); - relation_->addAttribute(partition_value); - - StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true); - - // Initialize blocks. - for (int i = 0; i < kNumBlocks; ++i) { - block_id bid = storage_manager_->createBlock(relation_, layout); - relation_->addBlock(bid); - insertTuples(bid); - } - } - - // Insert kNumTuplesPerBlock tuples into the block. - void insertTuples(block_id bid) { - MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_); - for (int i = 0; i < kNumTuplesPerBlock; ++i) { - Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i); - block->insertTuple(*tuple); - } - } - - Tuple* createTuple(int base_value) { - std::vector attrs; - - // int_attr. - if (GetParam() && base_value % 10 == 0) { - // Throw in a NULL integer for every ten values. - attrs.emplace_back(kInt); - } else { - attrs.emplace_back(base_value); - } - - // float_attr. - if (GetParam() && base_value % 10 == 1) { - attrs.emplace_back(kFloat); - } else { - attrs.emplace_back(static_cast(0.4 * base_value)); - } - - // long_attr. - if (GetParam() && base_value % 10 == 2) { - attrs.emplace_back(kLong); - } else { - attrs.emplace_back(static_cast(base_value)); - } - - // double_attr. - if (GetParam() && base_value % 10 == 3) { - attrs.emplace_back(kDouble); - } else { - attrs.emplace_back(static_cast(0.25 * base_value)); - } - - // char_attr - if (GetParam() && base_value % 10 == 4) { - attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue()); - } else { - std::ostringstream char_buffer; - char_buffer << base_value; - std::string string_literal(char_buffer.str()); - attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue( - string_literal.c_str(), - string_literal.size() > 3 ? 4 - : string_literal.size() + 1)); - attrs.back().ensureNotReference(); - } - - // varchar_attr - if (GetParam() && base_value % 10 == 5) { - attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue()); - } else { - std::ostringstream char_buffer; - char_buffer << "Here are some numbers: " << base_value; - std::string string_literal(char_buffer.str()); - attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue( - string_literal.c_str(), - string_literal.size() + 1)); - attrs.back().ensureNotReference(); - } - - // base_value - attrs.emplace_back(base_value / kNumTuplesPerPartition); - return new Tuple(std::move(attrs)); - } - + protected: // Handle initialization. void initializeHandle(const Type &argument_type, const std::vector &partition_key_types) { @@ -217,14 +90,18 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam { template static void CheckAvgValues( - std::vector expected, + std::vector expected, const ColumnVector *actual) { EXPECT_TRUE(actual->isNative()); NativeColumnVector *native = static_cast(actual); EXPECT_EQ(expected.size(), actual->size()); for (std::size_t i = 0; i < expected.size(); ++i) { - EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral()); + if (expected[i] == nullptr) { + EXPECT_TRUE(actual->getTypedValue(i).isNull()); + } else { + EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral()); + } } } @@ -238,405 +115,110 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam { void checkAggregationAvgGeneric() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(relation_, storage_manager_).empty()); - - aggregation_handle_avg_->calculate(relation_.getBlocksSnapshot(), - std::vectorfinalize()->getNumTuplesVirtual()); + + // Create argument, partition key and cpptype vectors. + std::vector argument_cpp_vector; + argument_cpp_vector.reserve(kNumTuples); + ColumnVector *argument_type_vector = + createArgumentGeneric(&argument_cpp_vector); + const IntType &int_type = ; + NativeColumnVector *partition_key_vector = + new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2); + + for (int i = 0; i < kNumTuples; ++i) { + partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition)); + } - std::vector result_vector; - typename GenericType::cpptype val; - typename GenericType::cpptype sum; - SetDataType(0, &sum); + // Create tuple ValueAccessor + ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor(); + tuple_accessor->addColumn(partition_key_vector); + tuple_accessor->addColumn(argument_type_vector); - for (int i = 0; i < kNumSamples; ++i) { - if (type.getTypeID() == kInt || type.getTypeID() == kLong) { - SetDataType(i - 10, &val); - } else { - SetDataType(static_cast(i - 10)/10, &val); - } - iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val)); - sum += val; - } - iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue()); - CheckAvgValue(static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *aggregation_handle_avg_state_); + // Test UNBOUNDED PRECEDING AND CURRENT ROW. + calculateAccumulative(tuple_accessor, + argument_type_vector, + argument_cpp_vector); } template - ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) { - NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3); - - typename GenericType::cpptype val; - SetDataType(0, sum); + ColumnVector *createArgumentGeneric( + std::vector *argument_cpp_vector) { + const GenericType &type = GenericType::Instance(true); + NativeColumnVector *column = new NativeColumnVector(type, kNumSamples); column->appendTypedValue(type.makeNullValue()); for (int i = 0; i < kNumSamples; ++i) { + // Insert a NULL every kNullInterval tuples. + if (i % kNullInterval == 0) { + argument_cpp_vector->push_back(nullptr); + column->appendTypedValue(type.makeNullValue()); + continue; + } + + typename GenericType::cpptype val = new GenericType::cpptype; + if (type.getTypeID() == kInt || type.getTypeID() == kLong) { - SetDataType(i - 10, &val); + SetDataType(i - 10, val); } else { - SetDataType(static_cast(i - 10)/10, &val); - } - column->appendTypedValue(type.makeValue(&val)); - *sum += val; - // One NULL in the middle. - if (i == kNumSamples/2) { - column->appendTypedValue(type.makeNullValue()); + SetDataType(static_cast(i - 10) / 10, val); } + + column->appendTypedValue(type.makeValue(val)); + argument_cpp_vector->push_back(val); } - column->appendTypedValue(type.makeNullValue()); return column; } - template - void checkAggregationAvgGenericColumnVector() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull()); - - typename GenericType::cpptype sum; - SetDataType(0, &sum); - std::vector> column_vectors; - column_vectors.emplace_back(createColumnVectorGeneric(type, &sum)); - - std::unique_ptr cv_state( - aggregation_handle_avg_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *cv_state); - - aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get()); - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *aggregation_handle_avg_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template - void checkAggregationAvgGenericValueAccessor() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull()); - + template + void calculateAccumulate(ValueAccessor *tuple_accessor, + ColumnVector *argument_type_vector, + const std::vector &argument_cpp_vector) { + std::vector arguments; + arguments.push_back(argument_type_vector); + // The partition key index is 0. + std::vector partition_key(1, 0); + + ColumnVector *result = + handle_avg_->calculate(tuple_accessor, + std::move(arguments), + partition_key, + true /* is_row */, + -1 /* num_preceding: UNBOUNDED PRECEDING */, + 0 /* num_following: CURRENT ROW */); + + // Get the cpptype result. + std::vector result_cpp_vector; + bool is_null; typename GenericType::cpptype sum; - SetDataType(0, &sum); - std::unique_ptr accessor(new ColumnVectorsValueAccessor()); - accessor->addColumn(createColumnVectorGeneric(type, &sum)); + int count; + for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { + // Start of new partition + if (i % kNumTuplesPerPartition == 0) { + is_null = false; + SetDataType(0, &sum); + count = 0; + } - std::unique_ptr va_state( - aggregation_handle_avg_->accumulateValueAccessor(accessor.get(), - std::vector(1, 0))); + typename GenericType::cpptype *value = argument_cpp_vector[i]; + if (value == nullptr) { + is_null = true; + } - // Test the state generated directly by accumulateValueAccessor(), and also - // test after merging back. - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *va_state); + if (is_null) { + result_cpp_vector.push_back(nullptr); + } else { + sum += *value; + count++; + result_cpp_vector.push_back(static_cast(sum) / count); + } + } - aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get()); - CheckAvgValue( - static_cast(sum) / kNumSamples, - *aggregation_handle_avg_, - *aggregation_handle_avg_state_); + CheckAvgValues(result_cpp_vector, result); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - std::unique_ptr aggregation_handle_avg_; - std::unique_ptr aggregation_handle_avg_state_; - std::unique_ptr storage_manager_; - std::unique_ptr relation_; + std::unique_ptr handle_avg_; }; -const int AggregationHandleAvgTest::kNumSamples; - -template <> -void AggregationHandleAvgTest::CheckAvgValue( - double expected, - const AggregationHandle &handle, - const AggregationState &state) { - EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral()); -} - -template <> -void AggregationHandleAvgTest::SetDataType(int value, DatetimeIntervalLit *data) { - data->interval_ticks = value; -} - -template <> -void AggregationHandleAvgTest::SetDataType(int value, YearMonthIntervalLit *data) { - data->months = value; -} - -typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest; - -TEST_F(AggregationHandleAvgTest, IntTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, LongTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, FloatTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, DoubleTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) { - checkAggregationAvgGeneric(); -} - -TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector(); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} - -TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} - -TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} - -TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} - -TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} - -TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor(); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -#ifdef QUICKSTEP_DEBUG -TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) { - const Type &type = CharType::Instance(true, 10); - EXPECT_DEATH(initializeHandle(type), ""); -} - -TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) { - const Type &type = VarCharType::Instance(true, 10); - EXPECT_DEATH(initializeHandle(type), ""); -} - -TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) { - const Type &int_non_null_type = IntType::Instance(false); - const Type &long_type = LongType::Instance(true); - const Type &double_type = DoubleType::Instance(true); - const Type &float_type = FloatType::Instance(true); - const Type &char_type = CharType::Instance(true, 10); - const Type &varchar_type = VarCharType::Instance(true, 10); - - initializeHandle(IntType::Instance(true)); - int int_val = 0; - std::int64_t long_val = 0; - double double_val = 0; - float float_val = 0; - - iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val)); - - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), ""); - - // Test mergeStates() with incorrectly typed handles. - std::unique_ptr aggregation_handle_avg_double( - AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle( - std::vector(1, &double_type))); - std::unique_ptr aggregation_state_avg_merge_double( - aggregation_handle_avg_double->createInitialState()); - static_cast(*aggregation_handle_avg_double).iterateUnaryInl( - static_cast(aggregation_state_avg_merge_double.get()), - double_type.makeValue(&double_val)); - EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double, - aggregation_handle_avg_state_.get()), - ""); - - std::unique_ptr aggregation_handle_avg_float( - AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle( - std::vector(1, &float_type))); - std::unique_ptr aggregation_state_avg_merge_float( - aggregation_handle_avg_float->createInitialState()); - static_cast(*aggregation_handle_avg_float).iterateUnaryInl( - static_cast(aggregation_state_avg_merge_float.get()), - float_type.makeValue(&float_val)); - EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float, - aggregation_handle_avg_state_.get()), - ""); -} -#endif - -TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) { - EXPECT_TRUE(ApplyToTypesTest(kInt)); - EXPECT_TRUE(ApplyToTypesTest(kLong)); - EXPECT_TRUE(ApplyToTypesTest(kFloat)); - EXPECT_TRUE(ApplyToTypesTest(kDouble)); - EXPECT_FALSE(ApplyToTypesTest(kChar)); - EXPECT_FALSE(ApplyToTypesTest(kVarChar)); - EXPECT_FALSE(ApplyToTypesTest(kDatetime)); - EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval)); - EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval)); -} - -TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) { - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); -} - -TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { - const Type &long_non_null_type = LongType::Instance(false); - initializeHandle(long_non_null_type); - storage_manager_.reset(new StorageManager("./test_avg_data")); - std::unique_ptr source_hash_table( - aggregation_handle_avg_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, - std::vector(1, &long_non_null_type), - 10, - storage_manager_.get())); - std::unique_ptr destination_hash_table( - aggregation_handle_avg_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, - std::vector(1, &long_non_null_type), - 10, - storage_manager_.get())); - - AggregationStateHashTable *destination_hash_table_derived = - static_cast *>( - destination_hash_table.get()); - - AggregationStateHashTable *source_hash_table_derived = - static_cast *>( - source_hash_table.get()); - - AggregationHandleAvg *aggregation_handle_avg_derived = - static_cast(aggregation_handle_avg_.get()); - // We create three keys: first is present in both the hash tables, second key - // is present only in the source hash table while the third key is present - // the destination hash table only. - std::vector common_key; - common_key.emplace_back(static_cast(0)); - std::vector exclusive_source_key, exclusive_destination_key; - exclusive_source_key.emplace_back(static_cast(1)); - exclusive_destination_key.emplace_back(static_cast(2)); - - const std::int64_t common_key_source_avg = 355; - TypedValue common_key_source_avg_val(common_key_source_avg); - - const std::int64_t common_key_destination_avg = 295; - TypedValue common_key_destination_avg_val(common_key_destination_avg); - - const std::int64_t exclusive_key_source_avg = 1; - TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg); - - const std::int64_t exclusive_key_destination_avg = 1; - TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg); - - std::unique_ptr common_key_source_state( - static_cast( - aggregation_handle_avg_->createInitialState())); - std::unique_ptr common_key_destination_state( - static_cast( - aggregation_handle_avg_->createInitialState())); - std::unique_ptr exclusive_key_source_state( - static_cast( - aggregation_handle_avg_->createInitialState())); - std::unique_ptr exclusive_key_destination_state( - static_cast( - aggregation_handle_avg_->createInitialState())); - - // Create avg value states for keys. - aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(), - common_key_source_avg_val); - - aggregation_handle_avg_derived->iterateUnaryInl( - common_key_destination_state.get(), common_key_destination_avg_val); - - aggregation_handle_avg_derived->iterateUnaryInl( - exclusive_key_destination_state.get(), exclusive_key_destination_avg_val); - - aggregation_handle_avg_derived->iterateUnaryInl( - exclusive_key_source_state.get(), exclusive_key_source_avg_val); - - // Add the key-state pairs to the hash tables. - source_hash_table_derived->putCompositeKey(common_key, - *common_key_source_state); - destination_hash_table_derived->putCompositeKey( - common_key, *common_key_destination_state); - source_hash_table_derived->putCompositeKey(exclusive_source_key, - *exclusive_key_source_state); - destination_hash_table_derived->putCompositeKey( - exclusive_destination_key, *exclusive_key_destination_state); - - EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); - EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - - aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table, - destination_hash_table.get()); - - EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); - - CheckAvgValue( - (common_key_destination_avg_val.getLiteral() + - common_key_source_avg_val.getLiteral()) / static_cast(2), - *aggregation_handle_avg_derived, - *(destination_hash_table_derived->getSingleCompositeKey(common_key))); - CheckAvgValue(exclusive_key_destination_avg_val.getLiteral(), - *aggregation_handle_avg_derived, - *(destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key))); - CheckAvgValue(exclusive_key_source_avg_val.getLiteral(), - *aggregation_handle_avg_derived, - *(source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key))); -} - } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/storage/WindowAggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp index de8cfeb..ea522b8 100644 --- a/storage/WindowAggregationOperationState.cpp +++ b/storage/WindowAggregationOperationState.cpp @@ -280,15 +280,16 @@ void WindowAggregationOperationState::windowAggregateBlocks( } // Do actual calculation in handle. - window_aggregation_handle_->calculate(all_blocks_accessor, - std::move(argument_vecs), - partition_by_ids_, - is_row_, - num_preceding_, - num_following_); - - ValueAccessor* output_accessor = window_aggregation_handle_->finalize(); - output_destination->bulkInsertTuples(output_accessor); + ColumnVector *window_aggregates = + window_aggregation_handle_->calculate(all_blocks_accessor, + std::move(argument_vecs), + partition_by_ids_, + is_row_, + num_preceding_, + num_following_); + + all_blocks_accessor->addColumn(window_aggregates); + output_destination->bulkInsertTuples(all_blocks_accessor); } } // namespace quickstep