Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 41D08200B61 for ; Tue, 26 Jul 2016 03:32:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 405D7160A8F; Tue, 26 Jul 2016 01:32:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 16C3A160A7D for ; Tue, 26 Jul 2016 03:32:07 +0200 (CEST) Received: (qmail 97995 invoked by uid 500); 26 Jul 2016 01:32:07 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 97985 invoked by uid 99); 26 Jul 2016 01:32:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2016 01:32:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 9CE14187984 for ; Tue, 26 Jul 2016 01:32:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id OLDq39NZheYN for ; Tue, 26 Jul 2016 01:31:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id C70855F230 for ; Tue, 26 Jul 2016 01:31:56 +0000 (UTC) Received: (qmail 97886 invoked by uid 99); 26 Jul 2016 01:31:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2016 01:31:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4EE3E00A7; Tue, 26 Jul 2016 01:31:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Tue, 26 Jul 2016 01:31:55 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] incubator-quickstep git commit: - Supported ROWS mode for AVG window aggregation. - Created WindowAggregateFunctions in expressions/window_aggregation. - Created WindowAggregationHandle for AVG to actually do the calculation. - Other functions will [Forced Update!] archived-at: Tue, 26 Jul 2016 01:32:10 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/policy-enforcer-dist a7d2355be -> 9a9be3dea (forced update) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp new file mode 100644 index 0000000..c044a98 --- /dev/null +++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * limitations under the License. + **/ + +#include +#include +#include +#include + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.hpp" +#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp" +#include "expressions/window_aggregation/WindowAggregationHandle.hpp" +#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp" +#include "expressions/window_aggregation/WindowAggregationID.hpp" +#include "storage/ValueAccessor.hpp" +#include "types/CharType.hpp" +#include "types/DateOperatorOverloads.hpp" +#include "types/DatetimeIntervalType.hpp" +#include "types/DoubleType.hpp" +#include "types/FloatType.hpp" +#include "types/IntType.hpp" +#include "types/IntervalLit.hpp" +#include "types/LongType.hpp" +#include "types/Type.hpp" +#include "types/TypeFactory.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "types/VarCharType.hpp" +#include "types/YearMonthIntervalType.hpp" +#include "types/containers/ColumnVector.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" + +#include "gtest/gtest.h" + +namespace quickstep { + +namespace { + + constexpr int kNumTuples = 100; + constexpr int kNumTuplesPerPartition = 8; + constexpr int kNullInterval = 25; + constexpr int kNumPreceding = 2; + constexpr int kNumFollowing = 2; + +} // namespace + +// Attribute value could be null if set true. +class WindowAggregationHandleAvgTest : public::testing::Test { + protected: + // Handle initialization. + void initializeHandle(const Type &argument_type) { + const WindowAggregateFunction &function = + WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg); + std::vector partition_key_types(1, &TypeFactory::GetType(kInt, false)); + handle_avg_.reset(function.createHandle(std::vector(1, &argument_type), + std::move(partition_key_types))); + } + + // Test canApplyToTypes(). + static bool CanApplyToTypesTest(TypeID typeID) { + const Type &type = (typeID == kChar || typeID == kVarChar) ? + TypeFactory::GetType(typeID, static_cast(10)) : + TypeFactory::GetType(typeID); + + return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes( + std::vector(1, &type)); + } + + // Test resultTypeForArgumentTypes(). + static bool ResultTypeForArgumentTypesTest(TypeID input_type_id, + TypeID output_type_id) { + const Type *result_type + = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes( + std::vector(1, &TypeFactory::GetType(input_type_id))); + return (result_type->getTypeID() == output_type_id); + } + + template + static void CheckAvgValues( + const std::vector &expected, + const ColumnVector *actual) { + EXPECT_TRUE(actual->isNative()); + const NativeColumnVector *native = static_cast(actual); + + EXPECT_EQ(expected.size(), native->size()); + for (std::size_t i = 0; i < expected.size(); ++i) { + if (expected[i] == nullptr) { + EXPECT_TRUE(native->getTypedValue(i).isNull()); + } else { + EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral()); + } + } + } + + // Static templated methods for set a meaningful value to data types. + template + static void SetDataType(int value, CppType *data) { + *data = value; + } + + template + void checkWindowAggregationAvgGeneric() { + const GenericType &type = GenericType::Instance(true); + initializeHandle(type); + + // Create argument, partition key and cpptype vectors. + std::vector argument_cpp_vector; + argument_cpp_vector.reserve(kNumTuples); + ColumnVector *argument_type_vector = + createArgumentGeneric(&argument_cpp_vector); + NativeColumnVector *partition_key_vector = + new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2); + + for (int i = 0; i < kNumTuples; ++i) { + partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition)); + } + + // Create tuple ValueAccessor. + ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor(); + tuple_accessor->addColumn(partition_key_vector); + tuple_accessor->addColumn(argument_type_vector); + + // Test UNBOUNDED PRECEDING AND CURRENT ROW. + checkAccumulate(tuple_accessor, + argument_type_vector, + argument_cpp_vector); + // Test kNumPreceding PRECEDING AND kNumFollowing FOLLOWING. + checkSlidingWindow(tuple_accessor, + argument_type_vector, + argument_cpp_vector); + } + + template + ColumnVector *createArgumentGeneric( + std::vector *argument_cpp_vector) { + const GenericType &type = GenericType::Instance(true); + NativeColumnVector *column = new NativeColumnVector(type, kNumTuples); + + for (int i = 0; i < kNumTuples; ++i) { + // Insert a NULL every kNullInterval tuples. + if (i % kNullInterval == 0) { + argument_cpp_vector->push_back(nullptr); + column->appendTypedValue(type.makeNullValue()); + continue; + } + + typename GenericType::cpptype *val = new typename GenericType::cpptype; + + if (type.getTypeID() == kInt || type.getTypeID() == kLong) { + SetDataType(i - 10, val); + } else { + SetDataType(static_cast(i - 10) / 10, val); + } + + column->appendTypedValue(type.makeValue(val)); + argument_cpp_vector->push_back(val); + } + + return column; + } + + template + void checkAccumulate(ColumnVectorsValueAccessor *tuple_accessor, + ColumnVector *argument_type_vector, + const std::vector &argument_cpp_vector) { + std::vector arguments; + arguments.push_back(argument_type_vector); + // The partition key index is 0. + std::vector partition_key(1, 0); + + ColumnVector *result = + handle_avg_->calculate(tuple_accessor, + std::move(arguments), + partition_key, + true /* is_row */, + -1 /* num_preceding: UNBOUNDED PRECEDING */, + 0 /* num_following: CURRENT ROW */); + + // Get the cpptype result. + std::vector result_cpp_vector; + typename GenericType::cpptype sum; + int count; + for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { + // Start of new partition + if (i % kNumTuplesPerPartition == 0) { + SetDataType(0, &sum); + count = 0; + } + + typename GenericType::cpptype *value = argument_cpp_vector[i]; + if (value != nullptr) { + sum += *value; + count++; + } + + if (count == 0) { + result_cpp_vector.push_back(nullptr); + } else { + typename OutputType::cpptype *result_cpp_value = + new typename OutputType::cpptype; + *result_cpp_value = static_cast(sum) / count; + result_cpp_vector.push_back(result_cpp_value); + } + } + + CheckAvgValues(result_cpp_vector, result); + } + + template + void checkSlidingWindow(ColumnVectorsValueAccessor *tuple_accessor, + ColumnVector *argument_type_vector, + const std::vector &argument_cpp_vector) { + std::vector arguments; + arguments.push_back(argument_type_vector); + // The partition key index is 0. + std::vector partition_key(1, 0); + + ColumnVector *result = + handle_avg_->calculate(tuple_accessor, + std::move(arguments), + partition_key, + true /* is_row */, + kNumPreceding, + kNumFollowing); + + // Get the cpptype result. + // For each value, calculate all surrounding values in the window. + std::vector result_cpp_vector; + + for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) { + typename GenericType::cpptype sum; + SetDataType(0, &sum); + int count = 0; + + if (argument_cpp_vector[i] != nullptr) { + sum += *argument_cpp_vector[i]; + count++; + } + + for (std::size_t precede = 1; precede <= kNumPreceding; ++precede) { + // Not the same partition. + if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition || + i < precede) { + break; + } + + if (argument_cpp_vector[i - precede] != nullptr) { + sum += *argument_cpp_vector[i - precede]; + count++; + } + } + + for (int follow = 1; follow <= kNumPreceding; ++follow) { + // Not the same partition. + if (i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition || + i + follow >= kNumTuples) { + break; + } + + if (argument_cpp_vector[i + follow] != nullptr) { + sum += *argument_cpp_vector[i + follow]; + count++; + } + } + + if (count == 0) { + result_cpp_vector.push_back(nullptr); + } else { + typename OutputType::cpptype *result_cpp_value = + new typename OutputType::cpptype; + *result_cpp_value = static_cast(sum) / count; + result_cpp_vector.push_back(result_cpp_value); + } + } + + CheckAvgValues(result_cpp_vector, result); + } + + std::unique_ptr handle_avg_; +}; + +template <> +void WindowAggregationHandleAvgTest::CheckAvgValues( + const std::vector &expected, + const ColumnVector *actual) { + EXPECT_TRUE(actual->isNative()); + const NativeColumnVector *native = static_cast(actual); + + EXPECT_EQ(expected.size(), native->size()); + for (std::size_t i = 0; i < expected.size(); ++i) { + if (expected[i] == nullptr) { + EXPECT_TRUE(native->getTypedValue(i).isNull()); + } else { + EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral()); + } + } +} + +template <> +void WindowAggregationHandleAvgTest::SetDataType( + int value, DatetimeIntervalLit *data) { + data->interval_ticks = value; +} + +template <> +void WindowAggregationHandleAvgTest::SetDataType( + int value, YearMonthIntervalLit *data) { + data->months = value; +} + +typedef WindowAggregationHandleAvgTest WindowAggregationHandleAvgDeathTest; + +TEST_F(WindowAggregationHandleAvgTest, IntTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +TEST_F(WindowAggregationHandleAvgTest, LongTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +TEST_F(WindowAggregationHandleAvgTest, FloatTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +TEST_F(WindowAggregationHandleAvgTest, DoubleTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +TEST_F(WindowAggregationHandleAvgTest, DatetimeIntervalTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +TEST_F(WindowAggregationHandleAvgTest, YearMonthIntervalTypeTest) { + checkWindowAggregationAvgGeneric(); +} + +#ifdef QUICKSTEP_DEBUG +TEST_F(WindowAggregationHandleAvgDeathTest, CharTypeTest) { + const Type &type = CharType::Instance(true, 10); + EXPECT_DEATH(initializeHandle(type), ""); +} + +TEST_F(WindowAggregationHandleAvgDeathTest, VarTypeTest) { + const Type &type = VarCharType::Instance(true, 10); + EXPECT_DEATH(initializeHandle(type), ""); +} +#endif + +TEST_F(WindowAggregationHandleAvgDeathTest, canApplyToTypeTest) { + EXPECT_TRUE(CanApplyToTypesTest(kInt)); + EXPECT_TRUE(CanApplyToTypesTest(kLong)); + EXPECT_TRUE(CanApplyToTypesTest(kFloat)); + EXPECT_TRUE(CanApplyToTypesTest(kDouble)); + EXPECT_FALSE(CanApplyToTypesTest(kChar)); + EXPECT_FALSE(CanApplyToTypesTest(kVarChar)); + EXPECT_FALSE(CanApplyToTypesTest(kDatetime)); + EXPECT_TRUE(CanApplyToTypesTest(kDatetimeInterval)); + EXPECT_TRUE(CanApplyToTypesTest(kYearMonthInterval)); +} + +TEST_F(WindowAggregationHandleAvgDeathTest, ResultTypeForArgumentTypeTest) { + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kInt, kDouble)); + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kLong, kDouble)); + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kFloat, kDouble)); + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDouble, kDouble)); + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDatetimeInterval, kDatetimeInterval)); + EXPECT_TRUE(ResultTypeForArgumentTypesTest(kYearMonthInterval, kYearMonthInterval)); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 7e53b9d..a56b714 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_expressions_scalar_ScalarAttribute + quickstep_expressions_windowaggregation_WindowAggregateFunction + quickstep_expressions_windowaggregation_WindowAggregateFunction_proto quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto quickstep_queryoptimizer_ExecutionHeuristics http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 43d63f9..ce21ade 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -48,6 +48,8 @@ #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.pb.h" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryContext.pb.h" #include "query_optimizer/ExecutionHeuristics.hpp" @@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate( // Get input. const CatalogRelationInfo *input_relation_info = findRelationInfoOutputByPhysical(physical_plan->input()); - window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID()); + window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID()); // Get window aggregate function expression. const E::AliasPtr &named_window_aggregate_expression = @@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate( const QueryPlan::DAGNodeIndex window_aggregation_operator_index = execution_plan_->addRelationalOperator( new WindowAggregationOperator(query_handle_->query_id(), + *input_relation_info->relation, *output_relation, window_aggr_state_index, insert_destination_index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt index 08d7df5..d12644a 100644 --- a/query_optimizer/expressions/CMakeLists.txt +++ b/query_optimizer/expressions/CMakeLists.txt @@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction glog - quickstep_expressions_aggregation_AggregateFunction + quickstep_expressions_windowaggregation_WindowAggregateFunction quickstep_queryoptimizer_OptimizerTree quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_Expression http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp index 7b1f304..be5db59 100644 --- a/query_optimizer/expressions/WindowAggregateFunction.cpp +++ b/query_optimizer/expressions/WindowAggregateFunction.cpp @@ -22,7 +22,7 @@ #include #include -#include "expressions/aggregation/AggregateFunction.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/Expression.hpp" #include "query_optimizer/expressions/PatternMatcher.hpp" @@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const { } WindowAggregateFunctionPtr WindowAggregateFunction::Create( - const ::quickstep::AggregateFunction &window_aggregate, + const ::quickstep::WindowAggregateFunction &window_aggregate, const std::vector &arguments, const WindowInfo &window_info, const std::string &window_name, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp index 0bee28f..0cc80df 100644 --- a/query_optimizer/expressions/WindowAggregateFunction.hpp +++ b/query_optimizer/expressions/WindowAggregateFunction.hpp @@ -33,8 +33,8 @@ namespace quickstep { -class AggregateFunction; class Type; +class WindowAggregateFunction; namespace optimizer { namespace expressions { @@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression { * @return The WindowAggregateFunction singleton (from the expression system) * for this node. **/ - inline const ::quickstep::AggregateFunction& window_aggregate() const { + inline const ::quickstep::WindowAggregateFunction& window_aggregate() const { return window_aggregate_; } @@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression { * @param is_distinct Whether this is a DISTINCT aggregation. * @return A new AggregateFunctionPtr. **/ - static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate, + static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate, const std::vector &arguments, const WindowInfo &window_info, const std::string &window_name, @@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression { * @param window_info The window info of the window aggregate function. * @param is_distinct Indicates whether this is a DISTINCT aggregation. */ - WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate, + WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate, const std::vector &arguments, const WindowInfo &window_info, const std::string &window_name, @@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression { // window_aggregate_. If it really needs to be seperated from the // AggregationFunction, a new class for WindowAggregationFunction should be // created as quickstep::WindowAggregateFunction. - const ::quickstep::AggregateFunction &window_aggregate_; + const ::quickstep::WindowAggregateFunction &window_aggregate_; std::vector arguments_; const WindowInfo window_info_; const std::string window_name_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt index fb75767..9313e51 100644 --- a/query_optimizer/resolver/CMakeLists.txt +++ b/query_optimizer/resolver/CMakeLists.txt @@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver quickstep_expressions_tablegenerator_GeneratorFunction quickstep_expressions_tablegenerator_GeneratorFunctionFactory quickstep_expressions_tablegenerator_GeneratorFunctionHandle + quickstep_expressions_windowaggregation_WindowAggregateFunction + quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory quickstep_parser_ParseAssignment quickstep_parser_ParseBasicExpressions quickstep_parser_ParseBlockProperties http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp index f10378b..c224388 100644 --- a/query_optimizer/resolver/Resolver.cpp +++ b/query_optimizer/resolver/Resolver.cpp @@ -35,6 +35,8 @@ #include "expressions/table_generator/GeneratorFunction.hpp" #include "expressions/table_generator/GeneratorFunctionFactory.hpp" #include "expressions/table_generator/GeneratorFunctionHandle.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.hpp" +#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp" #include "parser/ParseAssignment.hpp" #include "parser/ParseBasicExpressions.hpp" #include "parser/ParseBlockProperties.hpp" @@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall( << "COUNT aggregate has both star (*) and non-star arguments."; } - // Try to look up the AggregateFunction by name using - // AggregateFunctionFactory. - const ::quickstep::AggregateFunction *aggregate - = AggregateFunctionFactory::GetByName(function_name); - if (aggregate == nullptr) { + // Try to look up the AggregateFunction/WindowAggregationFunction by name. + // TODO(Shixuan): We might want to create a new abstract class Function to + // include both AggregateFunction and WindowAggregateFunction, which will make + // this part of code cleaner. + const ::quickstep::AggregateFunction *aggregate = nullptr; + const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr; + if (parse_function_call.isWindow()) { + window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name); + } else { + aggregate = AggregateFunctionFactory::GetByName(function_name); + } + + if (aggregate == nullptr && window_aggregate == nullptr) { THROW_SQL_ERROR_AT(&parse_function_call) << "Unrecognized function name \"" << parse_function_call.name()->value() @@ -2656,11 +2666,14 @@ E::ScalarPtr Resolver::resolveFunctionCall( } // Make sure a naked COUNT() with no arguments wasn't specified. - if ((aggregate->getAggregationID() == AggregationID::kCount) - && (resolved_arguments.empty()) - && (!count_star)) { - THROW_SQL_ERROR_AT(&parse_function_call) - << "COUNT aggregate requires an argument (either scalar or star (*))"; + if ((aggregate != nullptr && + aggregate->getAggregationID() == AggregationID::kCount) || + (window_aggregate != nullptr && + window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) { + if ((resolved_arguments.empty()) && !count_star) { + THROW_SQL_ERROR_AT(&parse_function_call) + << "COUNT aggregate requires an argument (either scalar or star (*))"; + } } // Resolve each of the Scalar arguments to the aggregate. @@ -2670,7 +2683,8 @@ E::ScalarPtr Resolver::resolveFunctionCall( } // Make sure that the aggregate can apply to the specified argument(s). - if (!aggregate->canApplyToTypes(argument_types)) { + if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types)) + || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) { THROW_SQL_ERROR_AT(&parse_function_call) << "Aggregate function " << aggregate->getName() << " can not apply to the given argument(s)."; @@ -2679,7 +2693,7 @@ E::ScalarPtr Resolver::resolveFunctionCall( if (parse_function_call.isWindow()) { return resolveWindowAggregateFunction(parse_function_call, expression_resolution_info, - aggregate, + window_aggregate, resolved_arguments); } @@ -2705,7 +2719,7 @@ E::ScalarPtr Resolver::resolveFunctionCall( E::ScalarPtr Resolver::resolveWindowAggregateFunction( const ParseFunctionCall &parse_function_call, ExpressionResolutionInfo *expression_resolution_info, - const ::quickstep::AggregateFunction *window_aggregate, + const ::quickstep::WindowAggregateFunction *window_aggregate, const std::vector &resolved_arguments) { // A window aggregate function might be defined OVER a window name or a window. E::WindowAggregateFunctionPtr window_aggregate_function; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp index f4024e9..373430c 100644 --- a/query_optimizer/resolver/Resolver.hpp +++ b/query_optimizer/resolver/Resolver.hpp @@ -23,7 +23,6 @@ #include #include -#include "query_optimizer/expressions/AggregateFunction.hpp" #include "query_optimizer/expressions/Alias.hpp" #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" @@ -460,14 +459,14 @@ class Resolver { * @param expression_resolution_info Resolution info that contains the name * resolver and info to be updated after * resolution. - * @param aggregate The aggregate function. + * @param aggregate The window aggregate function. * @param resolved_arguments The resolved arguments. * @return An expression in the query optimizer. */ expressions::ScalarPtr resolveWindowAggregateFunction( const ParseFunctionCall &parse_function_call, ExpressionResolutionInfo *expression_resolution_info, - const ::quickstep::AggregateFunction *aggregate, + const ::quickstep::WindowAggregateFunction *aggregate, const std::vector &resolved_arguments); /** http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/tests/execution_generator/Select.test ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test index 16127cc..30a3c39 100644 --- a/query_optimizer/tests/execution_generator/Select.test +++ b/query_optimizer/tests/execution_generator/Select.test @@ -953,19 +953,79 @@ WHERE double_col < 0 == # Window Aggregation Test. -# Currently this is not supported, an empty table will be returned. -SELECT avg(int_col) OVER w FROM test +SELECT char_col, long_col, avg(long_col) OVER w FROM test WINDOW w AS -(PARTITION BY char_col - ORDER BY long_col DESC NULLS LAST +(ORDER BY char_col DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); -- -+------------------------+ -|avg(int_col) | -+------------------------+ -+------------------------+ ++--------------------+--------------------+------------------------+ +|char_col |long_col |avg(long_col) | ++--------------------+--------------------+------------------------+ +| 8 2.828427| 64| 64| +| 6 2.449490| 36| 50| +| 4 2.000000| 16| 38.666666666666664| +| 24 4.898979| 576| 173| +| 22 4.690416| 484| 235.19999999999999| +| 20 4.472136| 400| 262.66666666666669| +| 2 1.414214| 4| 225.71428571428572| +| 18 4.242641| 324| 238| +| 16 4.000000| 256| 240| +| 14 3.741657| 196| 235.59999999999999| +| 12 3.464102| 144| 227.27272727272728| +| 10 3.162278| 100| 216.66666666666666| +| 0 0.000000| 0| 200| +| -9 3.000000| 81| 191.5| +| -7 2.645751| 49| 182| +| -5 2.236068| 25| 172.1875| +| -3 1.732051| 9| 162.58823529411765| +| -23 4.795832| 529| 182.94444444444446| +| -21 4.582576| 441| 196.52631578947367| +| -19 4.358899| 361| 204.75| +| -17 4.123106| 289| 208.76190476190476| +| -15 3.872983| 225| 209.5| +| -13 3.605551| 169| 207.7391304347826| +| -11 3.316625| 121| 204.125| +| -1 1.000000| 1| 196| ++--------------------+--------------------+------------------------+ == +SELECT long_col, int_col, avg(int_col) OVER w FROM test +WINDOW w AS +(ORDER BY long_col DESC NULLS LAST + ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING); +-- ++--------------------+-----------+------------------------+ +|long_col |int_col |avg(int_col) | ++--------------------+-----------+------------------------+ +| 576| 24| 7.666666666666667| +| 529| -23| 0.5| +| 484| 22| 0.5| +| 441| -21| -10.25| +| 400| NULL| 0| +| 361| -19| -9.75| +| 324| 18| -0.5| +| 289| -17| -3.3999999999999999| +| 256| 16| 3.2000000000000002| +| 225| -15| -3| +| 196| 14| 2.7999999999999998| +| 169| -13| -2.6000000000000001| +| 144| 12| 0.5| +| 121| -11| -5.25| +| 100| NULL| 0| +| 81| -9| -4.75| +| 64| 8| -0.5| +| 49| -7| -1.3999999999999999| +| 36| 6| 1.2| +| 25| -5| -1| +| 16| 4| 0.80000000000000004| +| 9| -3| -0.59999999999999998| +| 4| 2| 0.5| +| 1| -1| -0.66666666666666663| +| 0| NULL| 0.5| ++--------------------+-----------+------------------------+ +== + +# Currently this is not supported, an empty table will be returned. SELECT int_col, sum(float_col) OVER (PARTITION BY char_col, long_col ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST @@ -987,5 +1047,5 @@ WINDOW w AS +------------------------+ |sum(avg(int_col)) | +------------------------+ -| NULL| +| -18| +------------------------+ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 249441d..a51370b 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator quickstep_relationaloperators_WorkOrder quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo + quickstep_storage_WindowAggregationOperationState quickstep_utility_Macros tmb) target_link_libraries(quickstep_relationaloperators_WorkOrder http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp index 93cb9d4..3149864 100644 --- a/relational_operators/WindowAggregationOperator.cpp +++ b/relational_operators/WindowAggregationOperator.cpp @@ -21,11 +21,13 @@ #include +#include "catalog/CatalogRelation.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/WorkOrderProtosContainer.hpp" #include "query_execution/WorkOrdersContainer.hpp" #include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlockInfo.hpp" +#include "storage/WindowAggregationOperationState.hpp" #include "tmb/id_typedefs.h" @@ -40,10 +42,14 @@ bool WindowAggregationOperator::getAllWorkOrders( DCHECK(query_context != nullptr); if (blocking_dependencies_met_ && !generated_) { + std::vector relation_blocks = + input_relation_.getBlocksSnapshot(); + container->addNormalWorkOrder( new WindowAggregationWorkOrder( query_id_, query_context->releaseWindowAggregationState(window_aggregation_state_index_), + std::move(relation_blocks), query_context->getInsertDestination(output_destination_index_)), op_index_); generated_ = true; @@ -67,6 +73,11 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() { proto->set_query_id(query_id_); proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index, window_aggregation_state_index_); + + const std::vector relation_blocks = input_relation_.getBlocksSnapshot(); + for (const block_id bid : relation_blocks) { + proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid); + } proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index, output_destination_index_); @@ -75,8 +86,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() { void WindowAggregationWorkOrder::execute() { - std::cout << "Window aggregation is not supported yet.\n" - << "An empty table is returned\n"; + state_->windowAggregateBlocks(output_destination_, + block_ids_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp index f3dfd14..bd83248 100644 --- a/relational_operators/WindowAggregationOperator.hpp +++ b/relational_operators/WindowAggregationOperator.hpp @@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator { * * @param query_id The ID of this query. * @param input_relation The relation to perform aggregation over. + * @param output_relation The relation for output. * @param window_aggregation_state_index The index of WindowAggregationState * in QueryContext. * @param output_destination_index The index of InsertDestination in * QueryContext for the output. **/ WindowAggregationOperator(const std::size_t query_id, + const CatalogRelation &input_relation, const CatalogRelation &output_relation, const QueryContext::window_aggregation_state_id window_aggregation_state_index, const QueryContext::insert_destination_id output_destination_index) : RelationalOperator(query_id), + input_relation_(input_relation), output_relation_(output_relation), window_aggregation_state_index_(window_aggregation_state_index), output_destination_index_(output_destination_index), @@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator { **/ serialization::WorkOrder* createWorkOrderProto(); + const CatalogRelation &input_relation_; const CatalogRelation &output_relation_; const QueryContext::window_aggregation_state_id window_aggregation_state_index_; const QueryContext::insert_destination_id output_destination_index_; @@ -117,43 +121,25 @@ class WindowAggregationWorkOrder : public WorkOrder { * * @param query_id The ID of this query. * @param state The WindowAggregationOperatorState to use. + * @param block_ids The blocks' id of the input relation. * @param output_destination The InsertDestination for output. **/ WindowAggregationWorkOrder(const std::size_t query_id, WindowAggregationOperationState *state, + std::vector &&block_ids, InsertDestination *output_destination) : WorkOrder(query_id), state_(state), + block_ids_(std::move(block_ids)), output_destination_(output_destination) {} ~WindowAggregationWorkOrder() override {} - /** - * @brief Get the pointer to WindowAggregationOperationState. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. - * - * @return A pointer to the window aggregation operation state. - **/ - WindowAggregationOperationState* state() { - return state_; - } - - /** - * @brief Get the pointer to output destination. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. - * - * @return A pointer to the output destination. - **/ - InsertDestination* output_destination() { - return output_destination_; - } - void execute() override; private: WindowAggregationOperationState *state_; + const std::vector block_ids_; InsertDestination *output_destination_; DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 69dee1b..076735f 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -249,6 +249,7 @@ message WindowAggregationWorkOrder { extend WorkOrder { // All required optional uint32 window_aggr_state_index = 336; - optional int32 insert_destination_index = 337; + repeated fixed64 block_ids = 337; + optional int32 insert_destination_index = 338; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 9df66e1..582effd 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -1053,19 +1053,26 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState quickstep_catalog_CatalogTypedefs quickstep_expressions_ExpressionFactories quickstep_expressions_Expressions_proto - quickstep_expressions_aggregation_AggregateFunction - quickstep_expressions_aggregation_AggregateFunctionFactory - quickstep_expressions_aggregation_AggregationHandle - quickstep_expressions_aggregation_AggregationID quickstep_expressions_scalar_Scalar quickstep_expressions_scalar_ScalarAttribute + quickstep_expressions_windowaggregation_WindowAggregateFunction + quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory + quickstep_expressions_windowaggregation_WindowAggregationHandle + quickstep_expressions_windowaggregation_WindowAggregationID + quickstep_storage_InsertDestination quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_SubBlocksReference + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil quickstep_storage_WindowAggregationOperationState_proto + quickstep_types_containers_ColumnVector + quickstep_types_containers_ColumnVectorUtil + quickstep_types_containers_ColumnVectorsValueAccessor quickstep_utility_Macros) target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto - quickstep_expressions_aggregation_AggregateFunction_proto quickstep_expressions_Expressions_proto + quickstep_expressions_windowaggregation_WindowAggregateFunction_proto ${PROTOBUF_LIBRARY}) # Module all-in-one library: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp index a0bcc37..0cdfc1a 100644 --- a/storage/WindowAggregationOperationState.cpp +++ b/storage/WindowAggregationOperationState.cpp @@ -31,14 +31,21 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/ExpressionFactories.hpp" #include "expressions/Expressions.pb.h" -#include "expressions/aggregation/AggregateFunction.hpp" -#include "expressions/aggregation/AggregateFunctionFactory.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationID.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" +#include "expressions/window_aggregation/WindowAggregateFunction.hpp" +#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp" +#include "expressions/window_aggregation/WindowAggregationHandle.hpp" +#include "expressions/window_aggregation/WindowAggregationID.hpp" +#include "storage/InsertDestination.hpp" #include "storage/StorageManager.hpp" +#include "storage/SubBlocksReference.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" #include "storage/WindowAggregationOperationState.pb.h" +#include "types/containers/ColumnVector.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "types/containers/ColumnVectorUtil.hpp" #include "glog/logging.h" @@ -46,23 +53,21 @@ namespace quickstep { WindowAggregationOperationState::WindowAggregationOperationState( const CatalogRelationSchema &input_relation, - const AggregateFunction *window_aggregate_function, + const WindowAggregateFunction *window_aggregate_function, std::vector> &&arguments, - std::vector> &&partition_by_attributes, + const std::vector> &partition_by_attributes, const bool is_row, const std::int64_t num_preceding, const std::int64_t num_following, StorageManager *storage_manager) : input_relation_(input_relation), arguments_(std::move(arguments)), - partition_by_attributes_(std::move(partition_by_attributes)), is_row_(is_row), num_preceding_(num_preceding), num_following_(num_following), storage_manager_(storage_manager) { // Get the Types of this window aggregate's arguments so that we can create an // AggregationHandle. - // TODO(Shixuan): Next step: New handles for window aggregation function. std::vector argument_types; for (const std::unique_ptr &argument : arguments_) { argument_types.emplace_back(&argument->getType()); @@ -71,28 +76,18 @@ WindowAggregationOperationState::WindowAggregationOperationState( // Check if window aggregate function could apply to the arguments. DCHECK(window_aggregate_function->canApplyToTypes(argument_types)); + // IDs and types of partition keys. + std::vector partition_by_types; + for (const std::unique_ptr &partition_by_attribute : partition_by_attributes) { + partition_by_ids_.push_back( + partition_by_attribute->getAttributeIdForValueAccessor()); + partition_by_types.push_back(&partition_by_attribute->getType()); + } + // Create the handle and initial state. window_aggregation_handle_.reset( - window_aggregate_function->createHandle(argument_types)); - window_aggregation_state_.reset( - window_aggregation_handle_->createInitialState()); - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // See if all of this window aggregate's arguments are attributes in the input - // relation. If so, remember the attribute IDs so that we can do copy elision - // when actually performing the window aggregation. - arguments_as_attributes_.reserve(arguments_.size()); - for (const std::unique_ptr &argument : arguments_) { - const attribute_id argument_id = argument->getAttributeIdForValueAccessor(); - if (argument_id == -1) { - arguments_as_attributes_.clear(); - break; - } else { - DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor()); - arguments_as_attributes_.push_back(argument_id); - } - } -#endif + window_aggregate_function->createHandle(std::move(argument_types), + std::move(partition_by_types))); } WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto( @@ -101,10 +96,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro StorageManager *storage_manager) { DCHECK(ProtoIsValid(proto, database)); - // Rebuild contructor arguments from their representation in 'proto'. - const AggregateFunction *aggregate_function - = &AggregateFunctionFactory::ReconstructFromProto(proto.function()); - std::vector> arguments; arguments.reserve(proto.arguments_size()); for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) { @@ -126,10 +117,10 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro const std::int64_t num_preceding = proto.num_preceding(); const std::int64_t num_following = proto.num_following(); - return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()), - aggregate_function, + return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()), + &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()), std::move(arguments), - std::move(partition_by_attributes), + partition_by_attributes, is_row, num_preceding, num_following, @@ -139,11 +130,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto, const CatalogDatabaseLite &database) { if (!proto.IsInitialized() || - !database.hasRelationWithId(proto.relation_id())) { + !database.hasRelationWithId(proto.input_relation_id())) { return false; } - if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) { + if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) { return false; } @@ -176,4 +167,122 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg return true; } +void WindowAggregationOperationState::windowAggregateBlocks( + InsertDestination *output_destination, + const std::vector &block_ids) { + // TODO(Shixuan): This is a quick fix for currently unsupported functions in + // order to pass the query_optimizer test. + if (window_aggregation_handle_.get() == nullptr) { + std::cout << "The function will be supported in the near future :)\n"; + return; + } + + // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar + // work. This will need Order Key to be passed so that we know where the + // window should start and end. + if (!is_row_) { + std::cout << "Currently we don't support RANGE frame mode :(\n"; + return; + } + + // Get the total number of tuples. + int num_tuples = 0; + for (const block_id block_idx : block_ids) { + num_tuples += + storage_manager_->getBlock(block_idx, input_relation_)->getNumTuples(); + } + + // Construct column vectors for attributes. + std::vector attribute_vecs; + for (std::size_t attr_id = 0; attr_id < input_relation_.size(); ++attr_id) { + const CatalogAttribute *attr = input_relation_.getAttributeById(attr_id); + const Type &type = attr->getType(); + + if (NativeColumnVector::UsableForType(type)) { + attribute_vecs.push_back(new NativeColumnVector(type, num_tuples)); + } else { + attribute_vecs.push_back(new IndirectColumnVector(type, num_tuples)); + } + } + + // Construct column vectors for arguments. + std::vector argument_vecs; + for (const std::unique_ptr &argument : arguments_) { + const Type &type = argument->getType(); + + if (NativeColumnVector::UsableForType(type)) { + argument_vecs.push_back(new NativeColumnVector(type, num_tuples)); + } else { + argument_vecs.push_back(new IndirectColumnVector(type, num_tuples)); + } + } + + // TODO(Shixuan): Add Support for Vector Copy Elision Selection. + // Add tuples and arguments into ColumnVectors. + for (const block_id block_idx : block_ids) { + BlockReference block = storage_manager_->getBlock(block_idx, input_relation_); + const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock(); + SubBlocksReference sub_block_ref(tuple_block, + block->getIndices(), + block->getIndicesConsistent()); + ValueAccessor *tuple_accessor = tuple_block.createValueAccessor(); + ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor(); + for (const std::unique_ptr &argument : arguments_) { + argument_accessor->addColumn(argument->getAllValues(tuple_accessor, + &sub_block_ref)); + } + + InvokeOnAnyValueAccessor(tuple_accessor, + [&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11) + tuple_accessor->beginIteration(); + argument_accessor->beginIteration(); + + while (tuple_accessor->next() && argument_accessor->next()) { + for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) { + ColumnVector *attr_vec = attribute_vecs[attr_id]; + if (attr_vec->isNative()) { + static_cast(attr_vec)->appendTypedValue( + tuple_accessor->getTypedValue(attr_id)); + } else { + static_cast(attr_vec)->appendTypedValue( + tuple_accessor->getTypedValue(attr_id)); + } + } + + for (std::size_t argument_idx = 0; + argument_idx < argument_vecs.size(); + ++argument_idx) { + ColumnVector *argument = argument_vecs[argument_idx]; + if (argument->isNative()) { + static_cast(argument)->appendTypedValue( + argument_accessor->getTypedValue(argument_idx)); + } else { + static_cast(argument)->appendTypedValue( + argument_accessor->getTypedValue(argument_idx)); + } + } + } + }); + } + + // Construct the value accessor for tuples in all blocks + ColumnVectorsValueAccessor *all_blocks_accessor + = new ColumnVectorsValueAccessor(); + for (ColumnVector *attr_vec : attribute_vecs) { + all_blocks_accessor->addColumn(attr_vec); + } + + // Do actual calculation in handle. + ColumnVector *window_aggregates = + window_aggregation_handle_->calculate(all_blocks_accessor, + std::move(argument_vecs), + partition_by_ids_, + is_row_, + num_preceding_, + num_following_); + + all_blocks_accessor->addColumn(window_aggregates); + output_destination->bulkInsertTuples(all_blocks_accessor); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp index d7b3e6a..9792a99 100644 --- a/storage/WindowAggregationOperationState.hpp +++ b/storage/WindowAggregationOperationState.hpp @@ -25,20 +25,20 @@ #include #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" +#include "expressions/window_aggregation/WindowAggregationHandle.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/WindowAggregationOperationState.pb.h" #include "utility/Macros.hpp" namespace quickstep { -class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; class InsertDestination; class StorageManager; +class WindowAggregateFunction; /** \addtogroup Storage * @{ @@ -63,13 +63,12 @@ class WindowAggregationOperationState { * current row. -1 means UNBOUNDED PRECEDING. * @param num_following The number of rows/range for the tuples following the * current row. -1 means UNBOUNDED FOLLOWING. - * @param storage_manager The StorageManager to use for allocating hash - * tables. + * @param storage_manager The StorageManager to get block references. */ WindowAggregationOperationState(const CatalogRelationSchema &input_relation, - const AggregateFunction *window_aggregate_function, + const WindowAggregateFunction *window_aggregate_function, std::vector> &&arguments, - std::vector> &&partition_by_attributes, + const std::vector> &partition_by_attributes, const bool is_row, const std::int64_t num_preceding, const std::int64_t num_following, @@ -107,66 +106,29 @@ class WindowAggregationOperationState { const CatalogDatabaseLite &database); /** - * @brief Get the is_row info. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. - * - * @return True if the frame mode is ROW, false if it is RANGE. - **/ - const bool is_row() const { return is_row_; } - - /** - * @brief Get the num_preceding info. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. - * - * @return The number of rows/range that precedes the current row. - **/ - const std::int64_t num_preceding() const { return num_preceding_; } - - /** - * @brief Get the num_following info. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. + * @brief Compute window aggregates on the tuples of the given relation. * - * @return The number of rows/range that follows the current row. + * @param output_destination The output destination for the computed window + * aggregate. + * @param block_ids The id of the blocks to be computed. **/ - const std::int64_t num_following() const { return num_following_; } - - /** - * @brief Get the pointer to StorageManager. - * @note This is a quickfix for "unused variable". After the window aggregate - * functions are built, these methods might be dropped. - * - * @return A pointer to the storage manager. - **/ - StorageManager *storage_manager() { return storage_manager_; } + void windowAggregateBlocks(InsertDestination *output_destination, + const std::vector &block_ids); private: const CatalogRelationSchema &input_relation_; - - // TODO(Shixuan): Handle and State for window aggregation will be needed for - // actual calculation. - std::unique_ptr window_aggregation_handle_; - std::unique_ptr window_aggregation_state_; + const std::vector block_ids_; + std::unique_ptr window_aggregation_handle_; std::vector> arguments_; + std::vector partition_by_ids_; - // We don't add order_by_attributes here since it is not needed after sorting. - std::vector> partition_by_attributes_; - - // Window framing information. + // Frame info. const bool is_row_; const std::int64_t num_preceding_; const std::int64_t num_following_; StorageManager *storage_manager_; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all an aggregate's argument expressions are simply attributes in - // 'input_relation_', then this caches the attribute IDs of those arguments. - std::vector arguments_as_attributes_; -#endif - DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.proto ---------------------------------------------------------------------- diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto index c7bd0ef..d888461 100644 --- a/storage/WindowAggregationOperationState.proto +++ b/storage/WindowAggregationOperationState.proto @@ -19,12 +19,12 @@ syntax = "proto2"; package quickstep.serialization; -import "expressions/aggregation/AggregateFunction.proto"; +import "expressions/window_aggregation/WindowAggregateFunction.proto"; import "expressions/Expressions.proto"; message WindowAggregationOperationState { - required int32 relation_id = 1; - required AggregateFunction function = 2; + required int32 input_relation_id = 1; + required WindowAggregateFunction function = 2; repeated Scalar arguments = 3; repeated Scalar partition_by_attributes = 4; required bool is_row = 5; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/tests/WindowAggregationOperationState_unittest.cpp ---------------------------------------------------------------------- diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp index c572034..d58f0f5 100644 --- a/storage/tests/WindowAggregationOperationState_unittest.cpp +++ b/storage/tests/WindowAggregationOperationState_unittest.cpp @@ -23,7 +23,7 @@ #include "catalog/CatalogDatabase.hpp" #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregateFunction.pb.h" +#include "expressions/window_aggregation/WindowAggregateFunction.pb.h" #include "storage/WindowAggregationOperationState.hpp" #include "storage/WindowAggregationOperationState.pb.h" @@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) { TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) { serialization::WindowAggregationOperationState proto; - proto.set_relation_id(kInvalidTableId); - proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG); + proto.set_input_relation_id(kInvalidTableId); + proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG); proto.set_is_row(true); proto.set_num_preceding(kValidNum); proto.set_num_following(kValidNum); @@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) { TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) { serialization::WindowAggregationOperationState proto; - proto.set_relation_id(rel_id_); - proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG); + proto.set_input_relation_id(rel_id_); + proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG); proto.set_is_row(true); proto.set_num_preceding(kInvalidNum); proto.set_num_following(kValidNum); @@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) { TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) { serialization::WindowAggregationOperationState proto; - proto.set_relation_id(rel_id_); - proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG); + proto.set_input_relation_id(rel_id_); + proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG); proto.set_is_row(true); proto.set_num_preceding(kValidNum); proto.set_num_following(kValidNum);