Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F38CE200B51 for ; Mon, 1 Aug 2016 16:39:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2091160A6C; Mon, 1 Aug 2016 14:39:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9AD40160A66 for ; Mon, 1 Aug 2016 16:39:10 +0200 (CEST) Received: (qmail 67909 invoked by uid 500); 1 Aug 2016 14:39:09 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 67900 invoked by uid 99); 1 Aug 2016 14:39:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2016 14:39:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3A514180567 for ; Mon, 1 Aug 2016 14:39:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.389 X-Spam-Level: X-Spam-Status: No, score=-2.389 tagged_above=-999 required=6.31 tests=[FILL_THIS_FORM_LOAN=2.237, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id d_VRqaGCBcEp for ; Mon, 1 Aug 2016 14:38:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id DC98D5FB36 for ; Mon, 1 Aug 2016 14:38:55 +0000 (UTC) Received: (qmail 67418 invoked by uid 99); 1 Aug 2016 14:38:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2016 14:38:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2BCFE9430; Mon, 1 Aug 2016 14:38:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shixuan@apache.org To: commits@quickstep.incubator.apache.org Date: Mon, 01 Aug 2016 14:38:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/6] incubator-quickstep git commit: RANGE mode and computation optimization. - Supported RANGE mode for window aggregation. - Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size. archived-at: Mon, 01 Aug 2016 14:39:13 -0000 RANGE mode and computation optimization. - Supported RANGE mode for window aggregation. - Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0172fde Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0172fde Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0172fde Branch: refs/heads/SQL-window-aggregation Commit: d0172fde0cffbf10bc858090e11169e11834be89 Parents: e53186e Author: shixuan Authored: Tue Jul 26 11:49:07 2016 -0500 Committer: shixuan Committed: Mon Aug 1 09:38:34 2016 -0500 ---------------------------------------------------------------------- expressions/window_aggregation/CMakeLists.txt | 15 +- .../WindowAggregateFunction.hpp | 19 +- .../WindowAggregateFunctionAvg.cpp | 14 +- .../WindowAggregateFunctionAvg.hpp | 6 +- .../WindowAggregateFunctionCount.cpp | 6 +- .../WindowAggregateFunctionCount.hpp | 6 +- .../WindowAggregateFunctionMax.cpp | 6 +- .../WindowAggregateFunctionMax.hpp | 6 +- .../WindowAggregateFunctionMin.cpp | 6 +- .../WindowAggregateFunctionMin.hpp | 6 +- .../WindowAggregateFunctionSum.cpp | 6 +- .../WindowAggregateFunctionSum.hpp | 6 +- .../WindowAggregationHandle.cpp | 186 ++++++++++++++++ .../WindowAggregationHandle.hpp | 100 ++++++--- .../WindowAggregationHandleAvg.cpp | 201 ++++++----------- .../WindowAggregationHandleAvg.hpp | 35 ++- .../WindowAggregationHandleAvg_unittest.cpp | 220 +++++++++++++++---- query_optimizer/ExecutionGenerator.cpp | 11 +- query_optimizer/resolver/Resolver.cpp | 19 +- .../tests/execution_generator/Select.test | 41 +++- storage/WindowAggregationOperationState.cpp | 69 +++--- storage/WindowAggregationOperationState.hpp | 9 +- storage/WindowAggregationOperationState.proto | 1 + 23 files changed, 692 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt index 6a16fcc..3a79b7e 100644 --- a/expressions/window_aggregation/CMakeLists.txt +++ b/expressions/window_aggregation/CMakeLists.txt @@ -44,7 +44,7 @@ add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum WindowAggregateFunctionSum.cpp WindowAggregateFunctionSum.hpp) add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle - ../../empty_src.cpp + WindowAggregationHandle.cpp WindowAggregationHandle.hpp) add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg WindowAggregationHandleAvg.cpp @@ -130,10 +130,17 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH glog quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogTypedefs + quickstep_expressions_scalar_Scalar quickstep_storage_StorageBlockInfo + quickstep_types_Type + quickstep_types_TypeFactory + quickstep_types_TypeID quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor + quickstep_types_operations_binaryoperations_BinaryOperation + quickstep_types_operations_binaryoperations_BinaryOperationFactory + quickstep_types_operations_binaryoperations_BinaryOperationID quickstep_types_operations_comparisons_Comparison quickstep_types_operations_comparisons_ComparisonFactory quickstep_types_operations_comparisons_ComparisonID @@ -141,8 +148,6 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_scalar_Scalar - quickstep_expressions_scalar_ScalarAttribute quickstep_expressions_windowaggregation_WindowAggregationHandle quickstep_storage_ValueAccessor quickstep_types_Type @@ -179,11 +184,13 @@ add_executable(WindowAggregationHandle_tests target_link_libraries(WindowAggregationHandle_tests gtest gtest_main + quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogTypedefs + quickstep_expressions_scalar_Scalar + quickstep_expressions_scalar_ScalarAttribute quickstep_expressions_windowaggregation_WindowAggregateFunction quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory quickstep_expressions_windowaggregation_WindowAggregationHandle - quickstep_expressions_windowaggregation_WindowAggregationHandleAvg quickstep_expressions_windowaggregation_WindowAggregationID quickstep_storage_ValueAccessor quickstep_types_CharType http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunction.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp index e40479b..7ffc4ae 100644 --- a/expressions/window_aggregation/WindowAggregateFunction.hpp +++ b/expressions/window_aggregation/WindowAggregateFunction.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_ #define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_ +#include #include #include @@ -32,6 +33,7 @@ namespace quickstep { class CatalogRelationSchema; +class Scalar; class Type; class WindowAggregationHandle; @@ -120,16 +122,23 @@ class WindowAggregateFunction { * * @param argument_types A list of zero or more Types (in order) for * arguments to this WindowAggregateFunction. - * @param partition_key_types A list or zero or more Types for partition keys - * to this WindowAggregateFunction. + * @param partition_by_attributes A list of attributes used as partition key. + * @param order_by_attributes A list of attributes used as order key. + * @param is_row True if the frame mode is ROWS, false if RANGE. + * @param num_preceding The number of rows/range that precedes the current row. + * @param num_following The number of rows/range that follows the current row. * * @return A new WindowAggregationHandle that can be used to compute this - * WindowAggregateFunction over the specified argument_types. Caller - * is responsible for deleting the returned object. + * WindowAggregateFunction over the specified window definition. + * Caller is responsible for deleting the returned object. **/ virtual WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const = 0; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const = 0; protected: explicit WindowAggregateFunction(const WindowAggregationID win_agg_id) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp index bc31a53..beb1c7a 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp +++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp @@ -73,13 +73,21 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes( WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const { DCHECK(canApplyToTypes(argument_types)) << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)" << " that AVG can not be applied to."; - return new WindowAggregationHandleAvg(partition_key_types, - *argument_types.front()); + return new WindowAggregationHandleAvg(partition_by_attributes, + order_by_attributes, + is_row, + num_preceding, + num_following, + argument_types[0]); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp index 32fd9d5..0e50415 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp +++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp @@ -58,7 +58,11 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction { WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const override; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const override; private: WindowAggregateFunctionAvg() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp index 504e000..ccd81ac 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp +++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp @@ -47,7 +47,11 @@ const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes( WindowAggregationHandle* WindowAggregateFunctionCount::createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const { DCHECK(canApplyToTypes(argument_types)) << "Attempted to create a WindowAggregationHandleCount for argument Types " << "that COUNT can not be applied to (> 1 argument)."; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp index 1b40fdd..2e5506a 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp +++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp @@ -58,7 +58,11 @@ class WindowAggregateFunctionCount : public WindowAggregateFunction { WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const override; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const override; private: WindowAggregateFunctionCount() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp index f3997c7..acfce82 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp +++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp @@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes( WindowAggregationHandle* WindowAggregateFunctionMax::createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const { DCHECK(canApplyToTypes(argument_types)) << "Attempted to create a WindowAggregationHandleMax for argument Type(s) " << "that MAX can not be applied to."; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp index 00c788e..a215703 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp +++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp @@ -58,7 +58,11 @@ class WindowAggregateFunctionMax : public WindowAggregateFunction { WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const override; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const override; private: WindowAggregateFunctionMax() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp index a13e28e..cd845bd 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp +++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp @@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes( WindowAggregationHandle* WindowAggregateFunctionMin::createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const { DCHECK(canApplyToTypes(argument_types)) << "Attempted to create a WindowAggregationHandleMin for argument Type(s) " << "that MIN can not be applied to."; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp index aeba539..fab88a8 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp +++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp @@ -58,7 +58,11 @@ class WindowAggregateFunctionMin : public WindowAggregateFunction { WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const override; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const override; private: WindowAggregateFunctionMin() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp index 636c53a..e2aeb60 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp +++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp @@ -71,7 +71,11 @@ const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes( WindowAggregationHandle* WindowAggregateFunctionSum::createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const { DCHECK(canApplyToTypes(argument_types)) << "Attempted to create a WindowAggregationHandleSum for argument Type(s) " << "that SUM can not be applied to."; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp index 047113c..8d7d61d 100644 --- a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp +++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp @@ -58,7 +58,11 @@ class WindowAggregateFunctionSum : public WindowAggregateFunction { WindowAggregationHandle* createHandle( const std::vector &argument_types, - const std::vector &partition_key_types) const override; + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) const override; private: WindowAggregateFunctionSum() http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandle.cpp b/expressions/window_aggregation/WindowAggregationHandle.cpp new file mode 100644 index 0000000..835eaff --- /dev/null +++ b/expressions/window_aggregation/WindowAggregationHandle.cpp @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "expressions/window_aggregation/WindowAggregationHandle.hpp" + +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/scalar/Scalar.hpp" +#include "types/Type.hpp" +#include "types/TypeFactory.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "types/operations/binary_operations/BinaryOperation.hpp" +#include "types/operations/binary_operations/BinaryOperationFactory.hpp" +#include "types/operations/binary_operations/BinaryOperationID.hpp" +#include "types/operations/comparisons/Comparison.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +WindowAggregationHandle::WindowAggregationHandle( + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following) + : is_row_(is_row), + num_preceding_(num_preceding), + num_following_(num_following) { + // IDs and types of partition keys. + std::vector partition_key_types; + for (const std::unique_ptr &partition_by_attribute : partition_by_attributes) { + partition_key_ids_.push_back( + partition_by_attribute->getAttributeIdForValueAccessor()); + partition_key_types.push_back(&partition_by_attribute->getType()); + } + + // Comparison operators for checking if two tuples belong to the same partition. + for (const Type *partition_key_type : partition_key_types) { + partition_equal_comparators_.emplace_back( + ComparisonFactory::GetComparison(ComparisonID::kEqual) + .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type)); + } + + // IDs and types of order keys. + const Type *first_order_key_type = nullptr; + for (const std::unique_ptr &order_by_attribute : order_by_attributes) { + order_key_ids_.push_back( + order_by_attribute->getAttributeIdForValueAccessor()); + if (first_order_key_type == nullptr) { + first_order_key_type = &order_by_attribute->getType(); + } + } + + // ID and type of the first order key if in RANGE mode. + if (!is_row) { + DCHECK(first_order_key_type != nullptr); + + // Comparators and operators to check window frame in RANGE mode. + const Type &long_type = TypeFactory::GetType(kLong, false); + range_compare_type_ = + TypeFactory::GetUnifyingType(*first_order_key_type, long_type); + + range_add_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) + .makeUncheckedBinaryOperatorForTypes(*first_order_key_type, long_type)); + range_comparator_.reset( + ComparisonFactory::GetComparison(ComparisonID::kLessOrEqual) + .makeUncheckedComparatorForTypes(*range_compare_type_, *range_compare_type_)); + } +} + +bool WindowAggregationHandle::samePartition( + const ColumnVectorsValueAccessor *tuple_accessor, + const tuple_id test_tuple_id) const { + // If test tuple does not exist. + if (test_tuple_id < 0 || + test_tuple_id >= tuple_accessor->getNumTuples()) { + return false; + } + + // Check all partition by attributes. + for (std::size_t partition_by_index = 0; + partition_by_index < partition_key_ids_.size(); + ++partition_by_index) { + if (!partition_equal_comparators_[partition_by_index]->compareTypedValues( + tuple_accessor->getTypedValue(partition_key_ids_[partition_by_index]), + tuple_accessor->getTypedValueAtAbsolutePosition( + partition_key_ids_[partition_by_index], test_tuple_id))) { + return false; + } + } + + return true; +} + +bool WindowAggregationHandle::inWindow( + const ColumnVectorsValueAccessor *tuple_accessor, + const tuple_id test_tuple_id) const { + // If test tuple does not exist. + if (!samePartition(tuple_accessor, test_tuple_id)) { + return false; + } + + tuple_id current_tuple_id = tuple_accessor->getCurrentPosition(); + + // If test tuple is the current tuple, then it is in the window. + if (test_tuple_id == current_tuple_id) { + return true; + } + + // In ROWS mode, check the difference of tuple_id. + if (is_row_) { + if (num_preceding_ != -1 && + test_tuple_id < current_tuple_id - num_preceding_) { + return false; + } + + if (num_following_ != -1 && + test_tuple_id > current_tuple_id + num_following_) { + return false; + } + } else { + // In RANGE mode, check the difference of first order key value. + // Get the test value. + const Type &long_type = TypeFactory::GetType(kLong, false); + TypedValue test_value = + range_add_operator_->applyToTypedValues( + tuple_accessor->getTypedValueAtAbsolutePosition(order_key_ids_[0], test_tuple_id), + long_type.makeZeroValue()); + + // NULL will be considered not in range. + if (test_value.isNull() || + tuple_accessor->getTypedValue(order_key_ids_[0]).isNull()) { + return false; + } + + // Get the boundary value if it is not UNBOUNDED. + if (num_preceding_ > -1) { + // num_preceding needs to be negated for calculation. + std::int64_t neg_num_preceding = -num_preceding_; + TypedValue start_boundary_value = + range_add_operator_->applyToTypedValues( + tuple_accessor->getTypedValue(order_key_ids_[0]), + long_type.makeValue(&neg_num_preceding)); + if (!range_comparator_->compareTypedValues(start_boundary_value, test_value)) { + return false; + } + } + + if (num_following_ > -1) { + TypedValue end_boundary_value = + range_add_operator_->applyToTypedValues( + tuple_accessor->getTypedValue(order_key_ids_[0]), + long_type.makeValue(&num_following_)); + if (!range_comparator_->compareTypedValues(test_value, end_boundary_value)) { + return false; + } + } + } + + return true; +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp index 65f95d9..41d1d96 100644 --- a/expressions/window_aggregation/WindowAggregationHandle.hpp +++ b/expressions/window_aggregation/WindowAggregationHandle.hpp @@ -27,19 +27,23 @@ #include "catalog/CatalogRelationSchema.hpp" #include "catalog/CatalogTypedefs.hpp" #include "storage/StorageBlockInfo.hpp" +#include "types/Type.hpp" +#include "types/TypeFactory.hpp" +#include "types/TypeID.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/operations/comparisons/Comparison.hpp" #include "types/operations/comparisons/ComparisonFactory.hpp" #include "types/operations/comparisons/ComparisonID.hpp" +#include "types/operations/binary_operations/BinaryOperation.hpp" +#include "types/operations/binary_operations/BinaryOperationFactory.hpp" +#include "types/operations/binary_operations/BinaryOperationID.hpp" #include "utility/Macros.hpp" namespace quickstep { -class InsertDestinationInterface; class Scalar; -class StorageManager; class Type; class ValueAccessor; @@ -55,27 +59,29 @@ class ValueAccessor; * * A WindowAggregationHandle is created by calling * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object - * provides methods that are used to actually compute the window aggregate, - * storing intermediate results in WindowAggregationState objects. + * provides methods that are used to actually compute the window aggregate. * * The work flow for computing a window aggregate is: - * 1. Create an initial state by createInitialState(). - * 2. One thread will handle all the computation, iterating from the first + * 1. One thread will handle all the computation, iterating from the first * tuple to the last tuple. Note there will be two modes that could be * used upon different situations: * a. If the window aggregate is defined as accumulative, which are: * i. Functions applied to whole partition, such as rank(), ntile() - * and dense_rank(). + * and dense_rank(). (Not implemented yet). * ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING * AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED * FOLLOWING". * Then, for functions except median, we could store some global - * values in the state without keeping all the tuple values around. + * values without keeping all the tuple values around. For simplicity, + * in avg(), count() and sum(), we treat the accumulative one as + * sliding window since the time complexity does not vary. * b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND - * 3 FOLLOWING", we have to store all the tuples in the state so that + * 3 FOLLOWING", we have to store all the tuples in the state (at + * least two pointers to the start tuple and end tuple), so that * we could know which values should be dropped as the window slides. - * For each computed value, generate a tuple store in the column vector. - * 3. Insert the new column into the original relation and return. + * For each computed value, generate a TypedValue and store it into a + * ColumnVector for window aggregate values. + * 2. Return the result ColumnVector. * * TODO(Shixuan): Currently we don't support parallelization. The basic idea for * parallelization is to calculate the partial result inside each block. Each @@ -96,37 +102,67 @@ class WindowAggregationHandle { * * @param block_accessors A pointer to the value accessor of block attributes. * @param arguments The ColumnVectors of arguments - * @param partition_by_ids The ids of partition keys. - * @param is_row True if the frame mode is ROWS, false if it is RANGE. - * @param num_preceding The number of rows/range that precedes the current row. - * @param num_following The number of rows/range that follows the current row. * * @return A ColumnVector of the calculated window aggregates. **/ virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) const = 0; + const std::vector &arguments) const = 0; protected: /** * @brief Constructor. * - * @param partition_key_types The Types of the partition key. + * @param partition_by_attributes A list of attributes used as partition key. + * @param order_by_attributes A list of attributes used as order key. + * @param is_row True if the frame mode is ROWS, false if RANGE. + * @param num_preceding The number of rows/range that precedes the current row. + * @param num_following The number of rows/range that follows the current row. + **/ + WindowAggregationHandle( + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following); + + /** + * @brief Check if test tuple is in the same partition as the current + * tuple in the accessor. + * + * @param tuple_accessor The ValueAccessor for tuples. + * @param test_tuple_id The id of the test tuple. + * + * @return True if test tuple is in the same partition as the current tuple in + * the accessor, false if not. **/ - explicit WindowAggregationHandle( - const std::vector &partition_key_types) { - // Comparison operators for checking if two tuples belong to the same partition. - for (const Type *partition_key_type : partition_key_types) { - equal_comparators_.emplace_back( - ComparisonFactory::GetComparison(ComparisonID::kEqual) - .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type)); - } - } - - std::vector> equal_comparators_; + bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor, + const tuple_id test_tuple_id) const; + + /** + * @brief Check if test tuple is in the defined range. + * + * @param tuple_accessor The ValueAccessor for tuples. + * @param test_tuple_id The id of the test tuple. + * + * @return True if test tuple is in the defined window, false if not. + **/ + bool inWindow(const ColumnVectorsValueAccessor *tuple_accessor, + const tuple_id test_tuple_id) const; + + // IDs and comparators for partition keys. + std::vector partition_key_ids_; + std::vector> partition_equal_comparators_; + + // IDs, type, Comparator and operator for frame boundary check in RANGE mode. + std::vector order_key_ids_; + std::unique_ptr range_add_operator_; + std::unique_ptr range_comparator_; // Less than or Equal + const Type* range_compare_type_; + + // Window frame information. + const bool is_row_; + const std::int64_t num_preceding_; + const std::int64_t num_following_; private: DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp index a6a10d4..e6a4b3f 100644 --- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp +++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp @@ -24,8 +24,7 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "expressions/scalar/Scalar.hpp" -#include "expressions/scalar/ScalarAttribute.hpp" +#include "expressions/window_aggregation/WindowAggregationHandle.hpp" #include "storage/ValueAccessor.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -42,14 +41,21 @@ namespace quickstep { WindowAggregationHandleAvg::WindowAggregationHandleAvg( - const std::vector &partition_key_types, - const Type &type) - : WindowAggregationHandle(partition_key_types), - argument_type_(type) { + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, + const bool is_row, + const std::int64_t num_preceding, + const std::int64_t num_following, + const Type *argument_type) + : WindowAggregationHandle(partition_by_attributes, + order_by_attributes, + is_row, + num_preceding, + num_following) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_id; - switch (type.getTypeID()) { + switch (argument_type->getTypeID()) { case kInt: case kLong: type_id = kLong; @@ -59,7 +65,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg( type_id = kDouble; break; default: - type_id = type.getTypeID(); + type_id = argument_type->getTypeID(); break; } @@ -76,7 +82,13 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg( // Add operator for summing argument values. fast_add_operator_.reset( BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) - .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_)); + .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type)); + + // Subtract operator for dropping argument values off the window. + fast_subtract_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract) + .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type)); + // Divide operator for dividing sum by count to get final average. divide_operator_.reset( BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide) @@ -85,11 +97,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg( ColumnVector* WindowAggregationHandleAvg::calculate( ColumnVectorsValueAccessor *tuple_accessor, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) const { + const std::vector &arguments) const { DCHECK_EQ(1u, arguments.size()); DCHECK(arguments[0]->isNative()); DCHECK_EQ(static_cast(tuple_accessor->getNumTuples()), @@ -98,144 +106,69 @@ ColumnVector* WindowAggregationHandleAvg::calculate( // Initialize the output column and argument accessor. NativeColumnVector *window_aggregates = new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples()); - ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor(); + ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor(); argument_accessor->addColumn(arguments[0]); + // Initialize the information about the window. + TypedValue sum = sum_type_->makeZeroValue(); + std::uint64_t count = 0; + tuple_id start_tuple_id = 0; // The id of the first tuple in the window. + tuple_id end_tuple_id = 0; // The id of the tuple that just passed the last + // tuple in the window. + // Create a window for each tuple and calculate the window aggregate. tuple_accessor->beginIteration(); argument_accessor->beginIteration(); while (tuple_accessor->next() && argument_accessor->next()) { - const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor, - argument_accessor, - partition_by_ids, - is_row, - num_preceding, - num_following); - window_aggregates->appendTypedValue(window_aggregate); - } - - return window_aggregates; -} - -TypedValue WindowAggregationHandleAvg::calculateOneWindow( - ColumnVectorsValueAccessor *tuple_accessor, - ColumnVectorsValueAccessor *argument_accessor, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) const { - // Initialize. - TypedValue sum = sum_type_->makeZeroValue(); - TypedValue current_value = argument_accessor->getTypedValue(0); - std::uint64_t count = 0; - - // Ignore the value if null. - if (!current_value.isNull()) { - sum = fast_add_operator_->applyToTypedValues(sum, current_value); - count++; - } - - // Get the partition key for the current row. - std::vector current_row_partition_key; - for (attribute_id partition_by_id : partition_by_ids) { - current_row_partition_key.push_back( - tuple_accessor->getTypedValue(partition_by_id)); - } - - // Get current position. - tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual(); - - // Find preceding tuples. - int count_preceding = 0; - tuple_id preceding_tuple_id = current_tuple_id; - while (num_preceding == -1 || count_preceding < num_preceding) { - preceding_tuple_id--; - - // No more preceding tuples. - if (preceding_tuple_id < 0) { - break; + tuple_id current_tuple_id = tuple_accessor->getCurrentPosition(); + + // If current tuple is not in the same partition as the previous tuple, + // reset the window. + if (!samePartition(tuple_accessor, current_tuple_id - 1)) { + start_tuple_id = current_tuple_id; + end_tuple_id = current_tuple_id; + count = 0; + sum = sum_type_->makeZeroValue(); } - // Get the partition keys and compare. If not the same partition as the - // current row, stop searching preceding tuples. - if (!samePartition(tuple_accessor, - current_row_partition_key, - preceding_tuple_id, - partition_by_ids)) { - break; + // Drop tuples that will be out of the window from the beginning. + while (!inWindow(tuple_accessor, start_tuple_id)) { + TypedValue start_value = + argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id); + // Ignore the value if NULL. + if (!start_value.isNull()) { + sum = fast_subtract_operator_->applyToTypedValues(sum, start_value); + count--; + } + + start_tuple_id++; } - // Actually count the element and do the calculation. - count_preceding++; - TypedValue preceding_value = - argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id); + // Add tuples that will be included by the window at the end. + while (inWindow(tuple_accessor, end_tuple_id)) { + TypedValue end_value = + argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id); - // Ignore the value if null. - if (!preceding_value.isNull()) { - sum = fast_add_operator_->applyToTypedValues(sum, preceding_value); - count++; - } - } - - // Find following tuples. - int count_following = 0; - tuple_id following_tuple_id = current_tuple_id; - while (num_following == -1 || count_following < num_following) { - following_tuple_id++; + // Ignore the value if NULL. + if (!end_value.isNull()) { + sum = fast_add_operator_->applyToTypedValues(sum, end_value); + count++; + } - // No more following tuples. - if (following_tuple_id == tuple_accessor->getNumTuples()) { - break; + end_tuple_id++; } - // Get the partition keys and compare. If not the same partition as the - // current row, stop searching preceding tuples. - if (!samePartition(tuple_accessor, - current_row_partition_key, - following_tuple_id, - partition_by_ids)) { - break; + // If all values are NULLs, return NULL; Otherwise, return the quotient. + if (count == 0) { + window_aggregates->appendTypedValue(result_type_->makeNullValue()); + } else { + window_aggregates->appendTypedValue( + divide_operator_->applyToTypedValues(sum, TypedValue(static_cast(count)))); } - - // Actually count the element and do the calculation. - count_following++; - TypedValue following_value = - argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id); - - // Ignore the value if null. - if (!following_value.isNull()) { - sum = fast_add_operator_->applyToTypedValues(sum, following_value); - count++; - } - } - - // If all values are NULLs, return NULL; Otherwise, return the quotient. - if (count == 0) { - return result_type_->makeNullValue(); - } else { - return divide_operator_->applyToTypedValues(sum, - TypedValue(static_cast(count))); } -} -bool WindowAggregationHandleAvg::samePartition( - const ColumnVectorsValueAccessor *tuple_accessor, - const std::vector ¤t_row_partition_key, - const tuple_id boundary_tuple_id, - const std::vector &partition_by_ids) const { - for (std::size_t partition_by_index = 0; - partition_by_index < partition_by_ids.size(); - ++partition_by_index) { - if (!equal_comparators_[partition_by_index]->compareTypedValues( - current_row_partition_key[partition_by_index], - tuple_accessor->getTypedValueAtAbsolutePosition( - partition_by_ids[partition_by_index], boundary_tuple_id))) { - return false; - } - } - - return true; + return window_aggregates; } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp index 5b41779..f7f2e4d 100644 --- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp +++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp @@ -31,7 +31,6 @@ #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/operations/binary_operations/BinaryOperation.hpp" -#include "types/operations/comparisons/Comparison.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -54,11 +53,7 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle { ~WindowAggregationHandleAvg() override {} ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors, - std::vector &&arguments, - const std::vector &partition_by_ids, - const bool is_row, - const std::int64_t num_preceding, - const std::int64_t num_following) const override; + const std::vector &arguments) const override; private: friend class WindowAggregateFunctionAvg; @@ -66,29 +61,25 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle { /** * @brief Constructor. * - * @param partition_key_types The Types of the partition key. - * @param type Type of the avg value. + * @param partition_by_attributes A list of attributes used as partition key. + * @param order_by_attributes A list of attributes used as order key. + * @param is_row True if the frame mode is ROWS, false if RANGE. + * @param num_preceding The number of rows/range that precedes the current row. + * @param num_following The number of rows/range that follows the current row. + * @param argument_type Type of the argument. **/ - WindowAggregationHandleAvg(const std::vector &partition_key_types, - const Type &type); - - TypedValue calculateOneWindow( - ColumnVectorsValueAccessor *tuple_accessor, - ColumnVectorsValueAccessor *argument_accessor, - const std::vector &partition_by_ids, + WindowAggregationHandleAvg( + const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, const bool is_row, const std::int64_t num_preceding, - const std::int64_t num_following) const; - - bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor, - const std::vector ¤t_row_partition_key, - const tuple_id boundary_tuple_id, - const std::vector &partition_by_ids) const; + const std::int64_t num_following, + const Type *argument_type); - const Type &argument_type_; const Type *sum_type_; const Type *result_type_; std::unique_ptr fast_add_operator_; + std::unique_ptr fast_subtract_operator_; std::unique_ptr divide_operator_; DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp index c044a98..cb58083 100644 --- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp +++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp @@ -23,11 +23,13 @@ #include #include +#include "catalog/CatalogAttribute.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "expressions/scalar/Scalar.hpp" +#include "expressions/scalar/ScalarAttribute.hpp" #include "expressions/window_aggregation/WindowAggregateFunction.hpp" #include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp" #include "expressions/window_aggregation/WindowAggregationHandle.hpp" -#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp" #include "expressions/window_aggregation/WindowAggregationID.hpp" #include "storage/ValueAccessor.hpp" #include "types/CharType.hpp" @@ -58,6 +60,9 @@ namespace { constexpr int kNullInterval = 25; constexpr int kNumPreceding = 2; constexpr int kNumFollowing = 2; + constexpr int kPartitionKeyIndex = 0; + constexpr int kOrderKeyIndex = 1; + constexpr int kNumTuplesPerOrderKey = 2; } // namespace @@ -65,12 +70,27 @@ namespace { class WindowAggregationHandleAvgTest : public::testing::Test { protected: // Handle initialization. - void initializeHandle(const Type &argument_type) { + WindowAggregationHandle* initializeHandle(const Type &argument_type, + const bool is_row = true, + const std::int64_t num_preceding = -1, + const std::int64_t num_following = 0) { const WindowAggregateFunction &function = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg); + const Type &int_type = TypeFactory::GetType(kInt, false); + std::vector> partition_by_attributes; + std::vector> order_by_attributes; + partition_by_attributes.emplace_back( + new ScalarAttribute(CatalogAttribute(nullptr, "partition_key", int_type, kPartitionKeyIndex))); + order_by_attributes.emplace_back( + new ScalarAttribute(CatalogAttribute(nullptr, "order_key", int_type, kOrderKeyIndex))); std::vector partition_key_types(1, &TypeFactory::GetType(kInt, false)); - handle_avg_.reset(function.createHandle(std::vector(1, &argument_type), - std::move(partition_key_types))); + + return function.createHandle(std::vector(1, &argument_type), + partition_by_attributes, + order_by_attributes, + is_row, + num_preceding, + num_following); } // Test canApplyToTypes(). @@ -117,24 +137,25 @@ class WindowAggregationHandleAvgTest : public::testing::Test { template void checkWindowAggregationAvgGeneric() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - // Create argument, partition key and cpptype vectors. std::vector argument_cpp_vector; argument_cpp_vector.reserve(kNumTuples); ColumnVector *argument_type_vector = createArgumentGeneric(&argument_cpp_vector); NativeColumnVector *partition_key_vector = - new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2); + new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples); + NativeColumnVector *order_key_vector = + new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples); for (int i = 0; i < kNumTuples; ++i) { partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition)); + order_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerOrderKey)); } // Create tuple ValueAccessor. ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor(); tuple_accessor->addColumn(partition_key_vector); + tuple_accessor->addColumn(order_key_vector); tuple_accessor->addColumn(argument_type_vector); // Test UNBOUNDED PRECEDING AND CURRENT ROW. @@ -182,45 +203,95 @@ class WindowAggregationHandleAvgTest : public::testing::Test { const std::vector &argument_cpp_vector) { std::vector arguments; arguments.push_back(argument_type_vector); - // The partition key index is 0. - std::vector partition_key(1, 0); - ColumnVector *result = - handle_avg_->calculate(tuple_accessor, - std::move(arguments), - partition_key, - true /* is_row */, - -1 /* num_preceding: UNBOUNDED PRECEDING */, - 0 /* num_following: CURRENT ROW */); + // Check ROWS mode. + WindowAggregationHandle *rows_handle = + initializeHandle(GenericType::Instance(true), + true /* is_row */, + -1 /* num_preceding: UNBOUNDED PRECEDING */, + 0 /* num_following: CURRENT ROW */); + ColumnVector *rows_result = + rows_handle->calculate(tuple_accessor, arguments); // Get the cpptype result. - std::vector result_cpp_vector; - typename GenericType::cpptype sum; - int count; + std::vector rows_result_cpp_vector; + typename GenericType::cpptype rows_sum; + int rows_count; for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { // Start of new partition if (i % kNumTuplesPerPartition == 0) { - SetDataType(0, &sum); - count = 0; + SetDataType(0, &rows_sum); + rows_count = 0; } typename GenericType::cpptype *value = argument_cpp_vector[i]; if (value != nullptr) { - sum += *value; - count++; + rows_sum += *value; + rows_count++; } - if (count == 0) { - result_cpp_vector.push_back(nullptr); + if (rows_count == 0) { + rows_result_cpp_vector.push_back(nullptr); } else { typename OutputType::cpptype *result_cpp_value = new typename OutputType::cpptype; - *result_cpp_value = static_cast(sum) / count; - result_cpp_vector.push_back(result_cpp_value); + *result_cpp_value = static_cast(rows_sum) / rows_count; + rows_result_cpp_vector.push_back(result_cpp_value); + } + } + + CheckAvgValues(rows_result_cpp_vector, rows_result); + + // Check RANGE mode. + WindowAggregationHandle *range_handle = + initializeHandle(GenericType::Instance(true), + false /* is_row */, + -1 /* num_preceding: UNBOUNDED PRECEDING */, + 0 /* num_following: CURRENT ROW */); + ColumnVector *range_result = + range_handle->calculate(tuple_accessor, arguments); + + // Get the cpptype result. + std::vector range_result_cpp_vector; + typename GenericType::cpptype range_sum; + int range_count; + std::size_t current_tuple = 0; + while (current_tuple < kNumTuples) { + // Start of new partition + if (current_tuple % kNumTuplesPerPartition == 0) { + SetDataType(0, &range_sum); + range_count = 0; + } + + // We have to consider following tuples with the same order key value. + std::size_t next_tuple = current_tuple; + while (next_tuple < kNumTuples && + next_tuple / kNumTuplesPerPartition == current_tuple / kNumTuplesPerPartition && + next_tuple / kNumTuplesPerOrderKey == current_tuple / kNumTuplesPerOrderKey) { + typename GenericType::cpptype *value = argument_cpp_vector[next_tuple]; + if (value != nullptr) { + range_sum += *value; + range_count++; + } + + next_tuple++; + } + + // Calculate the result cpp value. + typename OutputType::cpptype *result_cpp_value = nullptr; + if (range_count != 0) { + result_cpp_value = new typename OutputType::cpptype; + *result_cpp_value = static_cast(range_sum) / range_count; + } + + // Add the result values to the tuples with in the same order key value. + while (current_tuple != next_tuple) { + range_result_cpp_vector.push_back(result_cpp_value); + current_tuple++; } } - CheckAvgValues(result_cpp_vector, result); + CheckAvgValues(range_result_cpp_vector, range_result); } template @@ -229,20 +300,19 @@ class WindowAggregationHandleAvgTest : public::testing::Test { const std::vector &argument_cpp_vector) { std::vector arguments; arguments.push_back(argument_type_vector); - // The partition key index is 0. - std::vector partition_key(1, 0); - ColumnVector *result = - handle_avg_->calculate(tuple_accessor, - std::move(arguments), - partition_key, - true /* is_row */, - kNumPreceding, - kNumFollowing); + // Check ROWS mode. + WindowAggregationHandle *rows_handle = + initializeHandle(GenericType::Instance(true), + true /* is_row */, + kNumPreceding, + kNumFollowing); + ColumnVector *rows_result = + rows_handle->calculate(tuple_accessor, arguments); // Get the cpptype result. // For each value, calculate all surrounding values in the window. - std::vector result_cpp_vector; + std::vector rows_result_cpp_vector; for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { typename GenericType::cpptype sum; @@ -281,19 +351,81 @@ class WindowAggregationHandleAvgTest : public::testing::Test { } if (count == 0) { - result_cpp_vector.push_back(nullptr); + rows_result_cpp_vector.push_back(nullptr); } else { typename OutputType::cpptype *result_cpp_value = new typename OutputType::cpptype; *result_cpp_value = static_cast(sum) / count; - result_cpp_vector.push_back(result_cpp_value); + rows_result_cpp_vector.push_back(result_cpp_value); } } - CheckAvgValues(result_cpp_vector, result); - } + CheckAvgValues(rows_result_cpp_vector, rows_result); + + // Check RANGE mode. + WindowAggregationHandle *range_handle = + initializeHandle(GenericType::Instance(true), + false /* is_row */, + kNumPreceding, + kNumFollowing); + ColumnVector *range_result = + range_handle->calculate(tuple_accessor, arguments); + + // Get the cpptype result. + // For each value, calculate all surrounding values in the window. + std::vector range_result_cpp_vector; + + for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { + typename GenericType::cpptype sum; + SetDataType(0, &sum); + int count = 0; + + if (argument_cpp_vector[i] != nullptr) { + sum += *argument_cpp_vector[i]; + count++; + } + + int preceding_bound = i / kNumTuplesPerOrderKey - kNumPreceding; + for (std::size_t precede = 1; precede <= kNumTuples; ++precede) { + // Not in range or the same partition. + if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition || + static_cast((i - precede) / kNumTuplesPerOrderKey) < preceding_bound) { + break; + } + + if (argument_cpp_vector[i - precede] != nullptr) { + sum += *argument_cpp_vector[i - precede]; + count++; + } + } + + int following_bound = i / kNumTuplesPerOrderKey + kNumFollowing; + for (int follow = 1; follow <= kNumTuples; ++follow) { + // Not in range or the same partition. + if (i + follow >= kNumTuples || + i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition || + static_cast((i + follow) / kNumTuplesPerOrderKey) > following_bound) { + break; + } + + if (argument_cpp_vector[i + follow] != nullptr) { + sum += *argument_cpp_vector[i + follow]; + count++; + } + } + + if (count == 0) { + rows_result_cpp_vector.push_back(nullptr); + } else { + typename OutputType::cpptype *result_cpp_value = + new typename OutputType::cpptype; + *result_cpp_value = static_cast(sum) / count; + range_result_cpp_vector.push_back(result_cpp_value); + } + } - std::unique_ptr handle_avg_; + CheckAvgValues(range_result_cpp_vector, range_result); + } }; template <> http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index ce21ade..88103df 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1663,7 +1663,7 @@ void ExecutionGenerator::convertWindowAggregate( std::static_pointer_cast( named_window_aggregate_expression->expression()); - // Set the AggregateFunction. + // Set the WindowAggregateFunction. window_aggr_state_proto->mutable_function()->MergeFrom( window_aggregate_function->window_aggregate().getProto()); @@ -1683,6 +1683,15 @@ void ExecutionGenerator::convertWindowAggregate( ->MergeFrom(concretized_partition_by_attribute->getProto()); } + // Set order keys. + for (const E::ScalarPtr &order_by_attribute + : window_info.order_by_attributes) { + unique_ptr concretized_order_by_attribute( + order_by_attribute->concretize(attribute_substitution_map_)); + window_aggr_state_proto->add_order_by_attributes() + ->MergeFrom(concretized_order_by_attribute->getProto()); + } + // Set window frame info. if (window_info.frame_info == nullptr) { // If the frame is not specified, use the default setting: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/resolver/Resolver.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp index c224388..46808bf 100644 --- a/query_optimizer/resolver/Resolver.cpp +++ b/query_optimizer/resolver/Resolver.cpp @@ -814,9 +814,9 @@ L::LogicalPtr Resolver::resolveInsertSelection( cast_expressions.emplace_back(selection_attributes[aid]); } else { // TODO(jianqiao): implement Cast operation for non-numeric types. - if (destination_type.getSuperTypeID() == Type::kNumeric - && selection_type.getSuperTypeID() == Type::kNumeric - && destination_type.isSafelyCoercibleFrom(selection_type)) { + if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric && + selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric && + destination_type.isSafelyCoercibleFrom(selection_type)) { // Add cast operation const E::AttributeReferencePtr attr = selection_attributes[aid]; const E::ExpressionPtr cast_expr = @@ -1691,6 +1691,19 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window, // Resolve window frame if (parse_window.frame_info() != nullptr) { const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info(); + // For FRAME mode, the first attribute in ORDER BY must be numeric. + // TODO(Shixuan): Time-related types should also be supported. To handle + // this, some changes in the parser needs to be done since the time range + // should be specified with time units. Also, UNBOUNDED flags might be + // needed because -1 might not make sense in this case. + if (!parse_frame_info->is_row && + (order_by_attributes.empty() || + order_by_attributes[0]->getValueType().getSuperTypeID() != Type::SuperTypeID::kNumeric)) { + THROW_SQL_ERROR_AT(&parse_window) + << "A numeric attribute should be specified as the first ORDER BY " + << "attribute in FRAME mode"; + } + frame_info = new E::WindowFrameInfo(parse_frame_info->is_row, parse_frame_info->num_preceding, parse_frame_info->num_following); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/tests/execution_generator/Select.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test index 30a3c39..6bada6c 100644 --- a/query_optimizer/tests/execution_generator/Select.test +++ b/query_optimizer/tests/execution_generator/Select.test @@ -1025,17 +1025,40 @@ WINDOW w AS +--------------------+-----------+------------------------+ == -# Currently this is not supported, an empty table will be returned. -SELECT int_col, sum(float_col) OVER -(PARTITION BY char_col, long_col - ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST - RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING) +SELECT float_col, double_col, avg(double_col) OVER +(ORDER BY float_col DESC NULLS LAST, int_col ASC NULLS FIRST + RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING) FROM test; -- -+-----------+------------------------+ -|int_col |sum(float_col) | -+-----------+------------------------+ -+-----------+------------------------+ ++---------------+------------------------+------------------------+ +|float_col |double_col |avg(double_col) | ++---------------+------------------------+------------------------+ +| 4.89897966| 117.57550765359254| -5.2010907233390986| +| 4.79583168| -110.30412503619254| -3.3458568752518572| +| 4.69041586| 103.18914671611546| -3.3458568752518572| +| 4.5825758| -96.234089594072643| -4.2942570191393745| +| 4.47213602| NULL| -4.2942570191393745| +| 4.35889912| -82.81907992727281| -3.1771278735018194| +| 4.2426405| 76.367532368147124| -3.1771278735018194| +| 4.12310553| -70.092795635500224| -3.6217507631683268| +| 4| 64| -3.0100796703699935| +| 3.87298346| -58.094750193111253| -3.0100796703699935| +| 3.7416575| 52.38320341483518| -3.0100796703699935| +| 3.60555124| -46.872166581031856| -3.1193833079868254| +| 3.46410155| 41.569219381653056| -3.1193833079868254| +| 3.31662488| -36.4828726939094| -2.8361542397614437| +| 3.1622777| NULL| -2.8361542397614437| +| 3| -27| -2.7526926834086507| +| 2.82842708| 22.627416997969522| -8.4826069851706123| +| 2.64575124| -18.520259177452136| -9.0010404404476727| +| 2.44948983| 14.696938456699067| -4.1547599319129516| +| 2.23606801| -11.180339887498949| -4.2708832009567148| +| 2| 8| 0.11724429467951912| +| 1.73205078| -5.196152422706632| -4.7108157334609286| +| 1.41421354| 2.8284271247461903| -5.1226841602152344| +| 1| -1| -1.638218767582549| +| 0| NULL| 1.1580686755098886| ++---------------+------------------------+------------------------+ == SELECT sum(avg(int_col) OVER w) FROM test http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp index 0cdfc1a..49fa44d 100644 --- a/storage/WindowAggregationOperationState.cpp +++ b/storage/WindowAggregationOperationState.cpp @@ -56,15 +56,13 @@ WindowAggregationOperationState::WindowAggregationOperationState( const WindowAggregateFunction *window_aggregate_function, std::vector> &&arguments, const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, const bool is_row, const std::int64_t num_preceding, const std::int64_t num_following, StorageManager *storage_manager) : input_relation_(input_relation), arguments_(std::move(arguments)), - is_row_(is_row), - num_preceding_(num_preceding), - num_following_(num_following), storage_manager_(storage_manager) { // Get the Types of this window aggregate's arguments so that we can create an // AggregationHandle. @@ -76,18 +74,14 @@ WindowAggregationOperationState::WindowAggregationOperationState( // Check if window aggregate function could apply to the arguments. DCHECK(window_aggregate_function->canApplyToTypes(argument_types)); - // IDs and types of partition keys. - std::vector partition_by_types; - for (const std::unique_ptr &partition_by_attribute : partition_by_attributes) { - partition_by_ids_.push_back( - partition_by_attribute->getAttributeIdForValueAccessor()); - partition_by_types.push_back(&partition_by_attribute->getType()); - } - // Create the handle and initial state. window_aggregation_handle_.reset( - window_aggregate_function->createHandle(std::move(argument_types), - std::move(partition_by_types))); + window_aggregate_function->createHandle(argument_types, + partition_by_attributes, + order_by_attributes, + is_row, + num_preceding, + num_following)); } WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto( @@ -113,6 +107,15 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro database)); } + std::vector> order_by_attributes; + for (int attribute_idx = 0; + attribute_idx < proto.order_by_attributes_size(); + ++attribute_idx) { + order_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto( + proto.order_by_attributes(attribute_idx), + database)); + } + const bool is_row = proto.is_row(); const std::int64_t num_preceding = proto.num_preceding(); const std::int64_t num_following = proto.num_following(); @@ -121,6 +124,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()), std::move(arguments), partition_by_attributes, + order_by_attributes, is_row, num_preceding, num_following, @@ -160,6 +164,15 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg } } + for (int attribute_idx = 0; + attribute_idx < proto.order_by_attributes_size(); + ++attribute_idx) { + if (!ScalarFactory::ProtoIsValid(proto.order_by_attributes(attribute_idx), + database)) { + return false; + } + } + if (proto.num_preceding() < -1 || proto.num_following() < -1) { return false; } @@ -177,14 +190,6 @@ void WindowAggregationOperationState::windowAggregateBlocks( return; } - // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar - // work. This will need Order Key to be passed so that we know where the - // window should start and end. - if (!is_row_) { - std::cout << "Currently we don't support RANGE frame mode :(\n"; - return; - } - // Get the total number of tuples. int num_tuples = 0; for (const block_id block_idx : block_ids) { @@ -226,7 +231,11 @@ void WindowAggregationOperationState::windowAggregateBlocks( block->getIndices(), block->getIndicesConsistent()); ValueAccessor *tuple_accessor = tuple_block.createValueAccessor(); - ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor(); + ColumnVectorsValueAccessor *argument_accessor = nullptr; + if (!arguments_.empty()) { + argument_accessor = new ColumnVectorsValueAccessor(); + } + for (const std::unique_ptr &argument : arguments_) { argument_accessor->addColumn(argument->getAllValues(tuple_accessor, &sub_block_ref)); @@ -235,9 +244,15 @@ void WindowAggregationOperationState::windowAggregateBlocks( InvokeOnAnyValueAccessor(tuple_accessor, [&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11) tuple_accessor->beginIteration(); - argument_accessor->beginIteration(); + if (argument_accessor != nullptr) { + argument_accessor->beginIteration(); + } + + while (tuple_accessor->next()) { + if (argument_accessor != nullptr) { + argument_accessor->next(); + } - while (tuple_accessor->next() && argument_accessor->next()) { for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) { ColumnVector *attr_vec = attribute_vecs[attr_id]; if (attr_vec->isNative()) { @@ -275,11 +290,7 @@ void WindowAggregationOperationState::windowAggregateBlocks( // Do actual calculation in handle. ColumnVector *window_aggregates = window_aggregation_handle_->calculate(all_blocks_accessor, - std::move(argument_vecs), - partition_by_ids_, - is_row_, - num_preceding_, - num_following_); + argument_vecs); all_blocks_accessor->addColumn(window_aggregates); output_destination->bulkInsertTuples(all_blocks_accessor); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp index 9792a99..726b102 100644 --- a/storage/WindowAggregationOperationState.hpp +++ b/storage/WindowAggregationOperationState.hpp @@ -57,6 +57,7 @@ class WindowAggregationOperationState { * computed. * @param arguments A list of argument expressions to that aggregate. * @param partition_by_attributes A list of window partition key. + * @param order_by_attributes A list of window order key. * @param is_row True if the window frame is calculated by ROW, false if it is * calculated by RANGE. * @param num_preceding The number of rows/range for the tuples preceding the @@ -69,6 +70,7 @@ class WindowAggregationOperationState { const WindowAggregateFunction *window_aggregate_function, std::vector> &&arguments, const std::vector> &partition_by_attributes, + const std::vector> &order_by_attributes, const bool is_row, const std::int64_t num_preceding, const std::int64_t num_following, @@ -120,13 +122,6 @@ class WindowAggregationOperationState { const std::vector block_ids_; std::unique_ptr window_aggregation_handle_; std::vector> arguments_; - std::vector partition_by_ids_; - - // Frame info. - const bool is_row_; - const std::int64_t num_preceding_; - const std::int64_t num_following_; - StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.proto ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto index d888461..f879713 100644 --- a/storage/WindowAggregationOperationState.proto +++ b/storage/WindowAggregationOperationState.proto @@ -30,4 +30,5 @@ message WindowAggregationOperationState { required bool is_row = 5; required int64 num_preceding = 6; // -1 means UNBOUNDED PRECEDING. required int64 num_following = 7; // -1 means UNBOUNDED FOLLOWING. + repeated Scalar order_by_attributes = 8; }