quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shix...@apache.org
Subject [6/6] incubator-quickstep git commit: RANGE mode and computation optimization. - Supported RANGE mode for window aggregation. - Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.
Date Mon, 01 Aug 2016 14:38:59 GMT
RANGE mode and computation optimization.
- Supported RANGE mode for window aggregation.
- Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0172fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0172fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0172fde

Branch: refs/heads/SQL-window-aggregation
Commit: d0172fde0cffbf10bc858090e11169e11834be89
Parents: e53186e
Author: shixuan <shixuan@apache.org>
Authored: Tue Jul 26 11:49:07 2016 -0500
Committer: shixuan <shixuan@apache.org>
Committed: Mon Aug 1 09:38:34 2016 -0500

----------------------------------------------------------------------
 expressions/window_aggregation/CMakeLists.txt   |  15 +-
 .../WindowAggregateFunction.hpp                 |  19 +-
 .../WindowAggregateFunctionAvg.cpp              |  14 +-
 .../WindowAggregateFunctionAvg.hpp              |   6 +-
 .../WindowAggregateFunctionCount.cpp            |   6 +-
 .../WindowAggregateFunctionCount.hpp            |   6 +-
 .../WindowAggregateFunctionMax.cpp              |   6 +-
 .../WindowAggregateFunctionMax.hpp              |   6 +-
 .../WindowAggregateFunctionMin.cpp              |   6 +-
 .../WindowAggregateFunctionMin.hpp              |   6 +-
 .../WindowAggregateFunctionSum.cpp              |   6 +-
 .../WindowAggregateFunctionSum.hpp              |   6 +-
 .../WindowAggregationHandle.cpp                 | 186 ++++++++++++++++
 .../WindowAggregationHandle.hpp                 | 100 ++++++---
 .../WindowAggregationHandleAvg.cpp              | 201 ++++++-----------
 .../WindowAggregationHandleAvg.hpp              |  35 ++-
 .../WindowAggregationHandleAvg_unittest.cpp     | 220 +++++++++++++++----
 query_optimizer/ExecutionGenerator.cpp          |  11 +-
 query_optimizer/resolver/Resolver.cpp           |  19 +-
 .../tests/execution_generator/Select.test       |  41 +++-
 storage/WindowAggregationOperationState.cpp     |  69 +++---
 storage/WindowAggregationOperationState.hpp     |   9 +-
 storage/WindowAggregationOperationState.proto   |   1 +
 23 files changed, 692 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
index 6a16fcc..3a79b7e 100644
--- a/expressions/window_aggregation/CMakeLists.txt
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -44,7 +44,7 @@ add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
             WindowAggregateFunctionSum.cpp
             WindowAggregateFunctionSum.hpp)
 add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
-            ../../empty_src.cpp
+            WindowAggregationHandle.cpp
             WindowAggregationHandle.hpp)
 add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
             WindowAggregationHandleAvg.cpp
@@ -130,10 +130,17 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
                       glog
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_scalar_Scalar
                       quickstep_storage_StorageBlockInfo
+                      quickstep_types_Type
+                      quickstep_types_TypeFactory
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
                       quickstep_types_operations_comparisons_Comparison
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
@@ -141,8 +148,6 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
 target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
                       glog
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_scalar_Scalar
-                      quickstep_expressions_scalar_ScalarAttribute
                       quickstep_expressions_windowaggregation_WindowAggregationHandle
                       quickstep_storage_ValueAccessor
                       quickstep_types_Type
@@ -179,11 +184,13 @@ add_executable(WindowAggregationHandle_tests
 target_link_libraries(WindowAggregationHandle_tests
                       gtest
                       gtest_main
+                      quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarAttribute
                       quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_expressions_windowaggregation_WindowAggregationHandle
-                      quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
                       quickstep_expressions_windowaggregation_WindowAggregationID
                       quickstep_storage_ValueAccessor
                       quickstep_types_CharType

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index e40479b..7ffc4ae 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
 #define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -32,6 +33,7 @@
 namespace quickstep {
 
 class CatalogRelationSchema;
+class Scalar;
 class Type;
 class WindowAggregationHandle;
 
@@ -120,16 +122,23 @@ class WindowAggregateFunction {
    *
    * @param argument_types A list of zero or more Types (in order) for
    *        arguments to this WindowAggregateFunction.
-   * @param partition_key_types A list or zero or more Types for partition keys
-   *                            to this WindowAggregateFunction.
+   * @param partition_by_attributes A list of attributes used as partition key.
+   * @param order_by_attributes A list of attributes used as order key.
+   * @param is_row True if the frame mode is ROWS, false if RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
    * 
    * @return A new WindowAggregationHandle that can be used to compute this
-   *         WindowAggregateFunction over the specified argument_types. Caller
-   *         is responsible for deleting the returned object.
+   *         WindowAggregateFunction over the specified window definition.
+   *         Caller is responsible for deleting the returned object.
    **/
   virtual WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const = 0;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const = 0;
 
  protected:
   explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index bc31a53..beb1c7a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,13 +73,21 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
       << " that AVG can not be applied to.";
 
-  return new WindowAggregationHandleAvg(partition_key_types,
-                                        *argument_types.front());
+  return new WindowAggregationHandleAvg(partition_by_attributes,
+                                        order_by_attributes,
+                                        is_row,
+                                        num_preceding,
+                                        num_following,
+                                        argument_types[0]);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 32fd9d5..0e50415 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const override;
 
  private:
   WindowAggregateFunctionAvg()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
index 504e000..ccd81ac 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -47,7 +47,11 @@ const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleCount for argument Types "
       << "that COUNT can not be applied to (> 1 argument).";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
index 1b40fdd..2e5506a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionCount : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const override;
 
  private:
   WindowAggregateFunctionCount()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
index f3997c7..acfce82 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
       << "that MAX can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
index 00c788e..a215703 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMax : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const override;
 
  private:
   WindowAggregateFunctionMax()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
index a13e28e..cd845bd 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
       << "that MIN can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
index aeba539..fab88a8 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMin : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const override;
 
  private:
   WindowAggregateFunctionMin()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
index 636c53a..e2aeb60 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -71,7 +71,11 @@ const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
 
 WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
     const std::vector<const Type*> &argument_types,
-    const std::vector<const Type*> &partition_key_types) const {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following) const {
   DCHECK(canApplyToTypes(argument_types))
       << "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
       << "that SUM can not be applied to.";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
index 047113c..8d7d61d 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionSum : public WindowAggregateFunction {
 
   WindowAggregationHandle* createHandle(
       const std::vector<const Type*> &argument_types,
-      const std::vector<const Type*> &partition_key_types) const override;
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following) const override;
 
  private:
   WindowAggregateFunctionSum()

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.cpp b/expressions/window_aggregation/WindowAggregationHandle.cpp
new file mode 100644
index 0000000..835eaff
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.cpp
@@ -0,0 +1,186 @@
+/**
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandle::WindowAggregationHandle(
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following)
+    : is_row_(is_row),
+      num_preceding_(num_preceding),
+      num_following_(num_following) {
+  // IDs and types of partition keys.
+  std::vector<const Type*> partition_key_types;
+  for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+    partition_key_ids_.push_back(
+        partition_by_attribute->getAttributeIdForValueAccessor());
+    partition_key_types.push_back(&partition_by_attribute->getType());
+  }
+
+  // Comparison operators for checking if two tuples belong to the same partition.
+  for (const Type *partition_key_type : partition_key_types) {
+    partition_equal_comparators_.emplace_back(
+        ComparisonFactory::GetComparison(ComparisonID::kEqual)
+            .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+  }
+
+  // IDs and types of order keys.
+  const Type *first_order_key_type = nullptr;
+  for (const std::unique_ptr<const Scalar> &order_by_attribute : order_by_attributes) {
+    order_key_ids_.push_back(
+        order_by_attribute->getAttributeIdForValueAccessor());
+    if (first_order_key_type == nullptr) {
+      first_order_key_type = &order_by_attribute->getType();
+    }
+  }
+
+  // ID and type of the first order key if in RANGE mode.
+  if (!is_row) {
+    DCHECK(first_order_key_type != nullptr);
+
+    // Comparators and operators to check window frame in RANGE mode.
+    const Type &long_type = TypeFactory::GetType(kLong, false);
+    range_compare_type_ =
+        TypeFactory::GetUnifyingType(*first_order_key_type, long_type);
+
+    range_add_operator_.reset(
+        BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+            .makeUncheckedBinaryOperatorForTypes(*first_order_key_type, long_type));
+    range_comparator_.reset(
+        ComparisonFactory::GetComparison(ComparisonID::kLessOrEqual)
+            .makeUncheckedComparatorForTypes(*range_compare_type_, *range_compare_type_));
+  }
+}
+
+bool WindowAggregationHandle::samePartition(
+    const ColumnVectorsValueAccessor *tuple_accessor,
+    const tuple_id test_tuple_id) const {
+  // If test tuple does not exist.
+  if (test_tuple_id < 0 ||
+      test_tuple_id >= tuple_accessor->getNumTuples()) {
+    return false;
+  }
+
+  // Check all partition by attributes.
+  for (std::size_t partition_by_index = 0;
+       partition_by_index < partition_key_ids_.size();
+       ++partition_by_index) {
+    if (!partition_equal_comparators_[partition_by_index]->compareTypedValues(
+            tuple_accessor->getTypedValue(partition_key_ids_[partition_by_index]),
+            tuple_accessor->getTypedValueAtAbsolutePosition(
+                partition_key_ids_[partition_by_index], test_tuple_id))) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool WindowAggregationHandle::inWindow(
+    const ColumnVectorsValueAccessor *tuple_accessor,
+    const tuple_id test_tuple_id) const {
+  // If test tuple does not exist.
+  if (!samePartition(tuple_accessor, test_tuple_id)) {
+    return false;
+  }
+
+  tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+  // If test tuple is the current tuple, then it is in the window.
+  if (test_tuple_id == current_tuple_id) {
+    return true;
+  }
+
+  // In ROWS mode, check the difference of tuple_id.
+  if (is_row_) {
+    if (num_preceding_ != -1 &&
+        test_tuple_id < current_tuple_id - num_preceding_) {
+      return false;
+    }
+
+    if (num_following_ != -1 &&
+        test_tuple_id > current_tuple_id + num_following_) {
+      return false;
+    }
+  } else {
+    // In RANGE mode, check the difference of first order key value.
+    // Get the test value.
+    const Type &long_type = TypeFactory::GetType(kLong, false);
+    TypedValue test_value =
+        range_add_operator_->applyToTypedValues(
+            tuple_accessor->getTypedValueAtAbsolutePosition(order_key_ids_[0], test_tuple_id),
+            long_type.makeZeroValue());
+
+    // NULL will be considered not in range.
+    if (test_value.isNull() ||
+        tuple_accessor->getTypedValue(order_key_ids_[0]).isNull()) {
+      return false;
+    }
+
+    // Get the boundary value if it is not UNBOUNDED.
+    if (num_preceding_ > -1) {
+      // num_preceding needs to be negated for calculation.
+      std::int64_t neg_num_preceding = -num_preceding_;
+      TypedValue start_boundary_value =
+          range_add_operator_->applyToTypedValues(
+              tuple_accessor->getTypedValue(order_key_ids_[0]),
+              long_type.makeValue(&neg_num_preceding));
+      if (!range_comparator_->compareTypedValues(start_boundary_value, test_value)) {
+        return false;
+      }
+    }
+
+    if (num_following_ > -1) {
+      TypedValue end_boundary_value =
+          range_add_operator_->applyToTypedValues(
+              tuple_accessor->getTypedValue(order_key_ids_[0]),
+              long_type.makeValue(&num_following_));
+      if (!range_comparator_->compareTypedValues(test_value, end_boundary_value)) {
+        return false;
+      }
+    }
+  }
+
+  return true;
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 65f95d9..41d1d96 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -27,19 +27,23 @@
 #include "catalog/CatalogRelationSchema.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class InsertDestinationInterface;
 class Scalar;
-class StorageManager;
 class Type;
 class ValueAccessor;
 
@@ -55,27 +59,29 @@ class ValueAccessor;
  *
  * A WindowAggregationHandle is created by calling
  * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
- * provides methods that are used to actually compute the window aggregate,
- * storing intermediate results in WindowAggregationState objects.
+ * provides methods that are used to actually compute the window aggregate.
  *
  * The work flow for computing a window aggregate is:
- *     1. Create an initial state by createInitialState().
- *     2. One thread will handle all the computation, iterating from the first
+ *     1. One thread will handle all the computation, iterating from the first
  *        tuple to the last tuple. Note there will be two modes that could be
  *        used upon different situations:
  *        a. If the window aggregate is defined as accumulative, which are:
  *           i.  Functions applied to whole partition, such as rank(), ntile()
- *               and dense_rank().
+ *               and dense_rank(). (Not implemented yet).
  *           ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
  *               AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
  *               FOLLOWING".
  *           Then, for functions except median, we could store some global
- *           values in the state without keeping all the tuple values around.
+ *           values without keeping all the tuple values around. For simplicity,
+ *           in avg(), count() and sum(), we treat the accumulative one as
+ *           sliding window since the time complexity does not vary.
  *        b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
- *           3 FOLLOWING", we have to store all the tuples in the state so that
+ *           3 FOLLOWING", we have to store all the tuples in the state (at
+ *           least two pointers to the start tuple and end tuple), so that
  *           we could know which values should be dropped as the window slides.
- *        For each computed value, generate a tuple store in the column vector.
- *     3. Insert the new column into the original relation and return.
+ *        For each computed value, generate a TypedValue and store it into a
+ *        ColumnVector for window aggregate values.
+ *     2. Return the result ColumnVector.
  *
  * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
  * parallelization is to calculate the partial result inside each block. Each
@@ -96,37 +102,67 @@ class WindowAggregationHandle {
    *
    * @param block_accessors A pointer to the value accessor of block attributes.
    * @param arguments The ColumnVectors of arguments
-   * @param partition_by_ids The ids of partition keys.
-   * @param is_row True if the frame mode is ROWS, false if it is RANGE.
-   * @param num_preceding The number of rows/range that precedes the current row.
-   * @param num_following The number of rows/range that follows the current row.
    *
    * @return A ColumnVector of the calculated window aggregates.
    **/
   virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
-                                  std::vector<ColumnVector*> &&arguments,
-                                  const std::vector<attribute_id> &partition_by_ids,
-                                  const bool is_row,
-                                  const std::int64_t num_preceding,
-                                  const std::int64_t num_following) const = 0;
+                                  const std::vector<ColumnVector*> &arguments) const = 0;
 
  protected:
   /**
    * @brief Constructor.
    *
-   * @param partition_key_types The Types of the partition key.
+   * @param partition_by_attributes A list of attributes used as partition key.
+   * @param order_by_attributes A list of attributes used as order key.
+   * @param is_row True if the frame mode is ROWS, false if RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   **/
+  WindowAggregationHandle(
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+      const bool is_row,
+      const std::int64_t num_preceding,
+      const std::int64_t num_following);
+
+  /**
+   * @brief Check if test tuple is in the same partition as the current
+   *        tuple in the accessor.
+   *
+   * @param tuple_accessor The ValueAccessor for tuples.
+   * @param test_tuple_id The id of the test tuple.
+   *
+   * @return True if test tuple is in the same partition as the current tuple in
+   *         the accessor, false if not.
    **/
-  explicit WindowAggregationHandle(
-      const std::vector<const Type*> &partition_key_types) {
-    // Comparison operators for checking if two tuples belong to the same partition.
-    for (const Type *partition_key_type : partition_key_types) {
-      equal_comparators_.emplace_back(
-          ComparisonFactory::GetComparison(ComparisonID::kEqual)
-              .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
-    }
-  }
-
-  std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+                     const tuple_id test_tuple_id) const;
+
+  /**
+   * @brief Check if test tuple is in the defined range.
+   *
+   * @param tuple_accessor The ValueAccessor for tuples.
+   * @param test_tuple_id The id of the test tuple.
+   *
+   * @return True if test tuple is in the defined window, false if not.
+   **/
+  bool inWindow(const ColumnVectorsValueAccessor *tuple_accessor,
+                const tuple_id test_tuple_id) const;
+
+  // IDs and comparators for partition keys.
+  std::vector<attribute_id> partition_key_ids_;
+  std::vector<std::unique_ptr<UncheckedComparator>> partition_equal_comparators_;
+
+  // IDs, type, Comparator and operator for frame boundary check in RANGE mode.
+  std::vector<attribute_id> order_key_ids_;
+  std::unique_ptr<UncheckedBinaryOperator> range_add_operator_;
+  std::unique_ptr<UncheckedComparator> range_comparator_;  // Less than or Equal
+  const Type* range_compare_type_;
+
+  // Window frame information.
+  const bool is_row_;
+  const std::int64_t num_preceding_;
+  const std::int64_t num_following_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index a6a10d4..e6a4b3f 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -24,8 +24,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -42,14 +41,21 @@
 namespace quickstep {
 
 WindowAggregationHandleAvg::WindowAggregationHandleAvg(
-    const std::vector<const Type*> &partition_key_types,
-    const Type &type)
-    : WindowAggregationHandle(partition_key_types),
-      argument_type_(type) {
+    const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+    const bool is_row,
+    const std::int64_t num_preceding,
+    const std::int64_t num_following,
+    const Type *argument_type)
+    : WindowAggregationHandle(partition_by_attributes,
+                              order_by_attributes,
+                              is_row,
+                              num_preceding,
+                              num_following) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_id;
-  switch (type.getTypeID()) {
+  switch (argument_type->getTypeID()) {
     case kInt:
     case kLong:
       type_id = kLong;
@@ -59,7 +65,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
       type_id = kDouble;
       break;
     default:
-      type_id = type.getTypeID();
+      type_id = argument_type->getTypeID();
       break;
   }
 
@@ -76,7 +82,13 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
   // Add operator for summing argument values.
   fast_add_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
-          .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
+  // Subtract operator for dropping argument values off the window.
+  fast_subtract_operator_.reset(
+      BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract)
+          .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
   // Divide operator for dividing sum by count to get final average.
   divide_operator_.reset(
       BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
@@ -85,11 +97,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
 
 ColumnVector* WindowAggregationHandleAvg::calculate(
     ColumnVectorsValueAccessor *tuple_accessor,
-    std::vector<ColumnVector*> &&arguments,
-    const std::vector<attribute_id> &partition_by_ids,
-    const bool is_row,
-    const std::int64_t num_preceding,
-    const std::int64_t num_following) const {
+    const std::vector<ColumnVector*> &arguments) const {
   DCHECK_EQ(1u, arguments.size());
   DCHECK(arguments[0]->isNative());
   DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
@@ -98,144 +106,69 @@ ColumnVector* WindowAggregationHandleAvg::calculate(
   // Initialize the output column and argument accessor.
   NativeColumnVector *window_aggregates =
       new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
-  ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+  ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
   argument_accessor->addColumn(arguments[0]);
 
+  // Initialize the information about the window.
+  TypedValue sum = sum_type_->makeZeroValue();
+  std::uint64_t count = 0;
+  tuple_id start_tuple_id = 0;  // The id of the first tuple in the window.
+  tuple_id end_tuple_id = 0;  // The id of the tuple that just passed the last
+                              // tuple in the window.
+
   // Create a window for each tuple and calculate the window aggregate.
   tuple_accessor->beginIteration();
   argument_accessor->beginIteration();
 
   while (tuple_accessor->next() && argument_accessor->next()) {
-    const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
-                                                                 argument_accessor,
-                                                                 partition_by_ids,
-                                                                 is_row,
-                                                                 num_preceding,
-                                                                 num_following);
-    window_aggregates->appendTypedValue(window_aggregate);
-  }
-
-  return window_aggregates;
-}
-
-TypedValue WindowAggregationHandleAvg::calculateOneWindow(
-    ColumnVectorsValueAccessor *tuple_accessor,
-    ColumnVectorsValueAccessor *argument_accessor,
-    const std::vector<attribute_id> &partition_by_ids,
-    const bool is_row,
-    const std::int64_t num_preceding,
-    const std::int64_t num_following) const {
-  // Initialize.
-  TypedValue sum = sum_type_->makeZeroValue();
-  TypedValue current_value = argument_accessor->getTypedValue(0);
-  std::uint64_t count = 0;
-
-  // Ignore the value if null.
-  if (!current_value.isNull()) {
-    sum = fast_add_operator_->applyToTypedValues(sum, current_value);
-    count++;
-  }
-
-  // Get the partition key for the current row.
-  std::vector<TypedValue> current_row_partition_key;
-  for (attribute_id partition_by_id : partition_by_ids) {
-    current_row_partition_key.push_back(
-        tuple_accessor->getTypedValue(partition_by_id));
-  }
-
-  // Get current position.
-  tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
-
-  // Find preceding tuples.
-  int count_preceding = 0;
-  tuple_id preceding_tuple_id = current_tuple_id;
-  while (num_preceding == -1 || count_preceding < num_preceding) {
-    preceding_tuple_id--;
-
-    // No more preceding tuples.
-    if (preceding_tuple_id < 0) {
-      break;
+    tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+    // If current tuple is not in the same partition as the previous tuple,
+    // reset the window.
+    if (!samePartition(tuple_accessor, current_tuple_id - 1)) {
+      start_tuple_id = current_tuple_id;
+      end_tuple_id = current_tuple_id;
+      count = 0;
+      sum = sum_type_->makeZeroValue();
     }
 
-    // Get the partition keys and compare. If not the same partition as the
-    // current row, stop searching preceding tuples.
-    if (!samePartition(tuple_accessor,
-                       current_row_partition_key,
-                       preceding_tuple_id,
-                       partition_by_ids)) {
-      break;
+    // Drop tuples that will be out of the window from the beginning.
+    while (!inWindow(tuple_accessor, start_tuple_id)) {
+      TypedValue start_value =
+          argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id);
+      // Ignore the value if NULL.
+      if (!start_value.isNull()) {
+        sum = fast_subtract_operator_->applyToTypedValues(sum, start_value);
+        count--;
+      }
+
+      start_tuple_id++;
     }
 
-    // Actually count the element and do the calculation.
-    count_preceding++;
-    TypedValue preceding_value =
-        argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
+    // Add tuples that will be included by the window at the end.
+    while (inWindow(tuple_accessor, end_tuple_id)) {
+      TypedValue end_value =
+          argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id);
 
-    // Ignore the value if null.
-    if (!preceding_value.isNull()) {
-      sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
-      count++;
-    }
-  }
-
-  // Find following tuples.
-  int count_following = 0;
-  tuple_id following_tuple_id = current_tuple_id;
-  while (num_following == -1 || count_following < num_following) {
-    following_tuple_id++;
+      // Ignore the value if NULL.
+      if (!end_value.isNull()) {
+        sum = fast_add_operator_->applyToTypedValues(sum, end_value);
+        count++;
+      }
 
-    // No more following tuples.
-    if (following_tuple_id == tuple_accessor->getNumTuples()) {
-      break;
+      end_tuple_id++;
     }
 
-    // Get the partition keys and compare. If not the same partition as the
-    // current row, stop searching preceding tuples.
-    if (!samePartition(tuple_accessor,
-                       current_row_partition_key,
-                       following_tuple_id,
-                       partition_by_ids)) {
-      break;
+    // If all values are NULLs, return NULL; Otherwise, return the quotient.
+    if (count == 0) {
+      window_aggregates->appendTypedValue(result_type_->makeNullValue());
+    } else {
+      window_aggregates->appendTypedValue(
+          divide_operator_->applyToTypedValues(sum, TypedValue(static_cast<double>(count))));
     }
-
-    // Actually count the element and do the calculation.
-    count_following++;
-    TypedValue following_value =
-        argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
-
-    // Ignore the value if null.
-    if (!following_value.isNull()) {
-      sum = fast_add_operator_->applyToTypedValues(sum, following_value);
-      count++;
-    }
-  }
-
-  // If all values are NULLs, return NULL; Otherwise, return the quotient.
-  if (count == 0) {
-    return result_type_->makeNullValue();
-  } else {
-    return divide_operator_->applyToTypedValues(sum,
-                                                TypedValue(static_cast<double>(count)));
   }
-}
 
-bool WindowAggregationHandleAvg::samePartition(
-    const ColumnVectorsValueAccessor *tuple_accessor,
-    const std::vector<TypedValue> &current_row_partition_key,
-    const tuple_id boundary_tuple_id,
-    const std::vector<attribute_id> &partition_by_ids) const {
-  for (std::size_t partition_by_index = 0;
-       partition_by_index < partition_by_ids.size();
-       ++partition_by_index) {
-    if (!equal_comparators_[partition_by_index]->compareTypedValues(
-            current_row_partition_key[partition_by_index],
-            tuple_accessor->getTypedValueAtAbsolutePosition(
-                partition_by_ids[partition_by_index], boundary_tuple_id))) {
-      return false;
-    }
-  }
-
-  return true;
+  return window_aggregates;
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 5b41779..f7f2e4d 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -31,7 +31,6 @@
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -54,11 +53,7 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
   ~WindowAggregationHandleAvg() override {}
 
   ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
-                          std::vector<ColumnVector*> &&arguments,
-                          const std::vector<attribute_id> &partition_by_ids,
-                          const bool is_row,
-                          const std::int64_t num_preceding,
-                          const std::int64_t num_following) const override;
+                          const std::vector<ColumnVector*> &arguments) const override;
 
  private:
   friend class WindowAggregateFunctionAvg;
@@ -66,29 +61,25 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
   /**
    * @brief Constructor.
    *
-   * @param partition_key_types The Types of the partition key.
-   * @param type Type of the avg value.
+   * @param partition_by_attributes A list of attributes used as partition key.
+   * @param order_by_attributes A list of attributes used as order key.
+   * @param is_row True if the frame mode is ROWS, false if RANGE.
+   * @param num_preceding The number of rows/range that precedes the current row.
+   * @param num_following The number of rows/range that follows the current row.
+   * @param argument_type Type of the argument.
    **/
-  WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
-                             const Type &type);
-
-  TypedValue calculateOneWindow(
-      ColumnVectorsValueAccessor *tuple_accessor,
-      ColumnVectorsValueAccessor *argument_accessor,
-      const std::vector<attribute_id> &partition_by_ids,
+  WindowAggregationHandleAvg(
+      const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+      const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
       const bool is_row,
       const std::int64_t num_preceding,
-      const std::int64_t num_following) const;
-
-  bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
-                     const std::vector<TypedValue> &current_row_partition_key,
-                     const tuple_id boundary_tuple_id,
-                     const std::vector<attribute_id> &partition_by_ids) const;
+      const std::int64_t num_following,
+      const Type *argument_type);
 
-  const Type &argument_type_;
   const Type *sum_type_;
   const Type *result_type_;
   std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+  std::unique_ptr<UncheckedBinaryOperator> fast_subtract_operator_;
   std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index c044a98..cb58083 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -23,11 +23,13 @@
 #include <memory>
 #include <vector>
 
+#include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "expressions/window_aggregation/WindowAggregationHandle.hpp"
-#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
 #include "expressions/window_aggregation/WindowAggregationID.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/CharType.hpp"
@@ -58,6 +60,9 @@ namespace {
   constexpr int kNullInterval = 25;
   constexpr int kNumPreceding = 2;
   constexpr int kNumFollowing = 2;
+  constexpr int kPartitionKeyIndex = 0;
+  constexpr int kOrderKeyIndex = 1;
+  constexpr int kNumTuplesPerOrderKey = 2;
 
 }  // namespace
 
@@ -65,12 +70,27 @@ namespace {
 class WindowAggregationHandleAvgTest : public::testing::Test {
  protected:
   // Handle initialization.
-  void initializeHandle(const Type &argument_type) {
+  WindowAggregationHandle* initializeHandle(const Type &argument_type,
+                                            const bool is_row = true,
+                                            const std::int64_t num_preceding = -1,
+                                            const std::int64_t num_following = 0) {
     const WindowAggregateFunction &function =
         WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+    const Type &int_type = TypeFactory::GetType(kInt, false);
+    std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+    std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+    partition_by_attributes.emplace_back(
+        new ScalarAttribute(CatalogAttribute(nullptr, "partition_key", int_type, kPartitionKeyIndex)));
+    order_by_attributes.emplace_back(
+        new ScalarAttribute(CatalogAttribute(nullptr, "order_key", int_type, kOrderKeyIndex)));
     std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
-    handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
-                                            std::move(partition_key_types)));
+
+    return function.createHandle(std::vector<const Type*>(1, &argument_type),
+                                 partition_by_attributes,
+                                 order_by_attributes,
+                                 is_row,
+                                 num_preceding,
+                                 num_following);
   }
 
   // Test canApplyToTypes().
@@ -117,24 +137,25 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
 
   template <typename GenericType, typename OutputType = DoubleType>
   void checkWindowAggregationAvgGeneric() {
-    const GenericType &type = GenericType::Instance(true);
-    initializeHandle(type);
-
     // Create argument, partition key and cpptype vectors.
     std::vector<typename GenericType::cpptype*> argument_cpp_vector;
     argument_cpp_vector.reserve(kNumTuples);
     ColumnVector *argument_type_vector =
         createArgumentGeneric<GenericType>(&argument_cpp_vector);
     NativeColumnVector *partition_key_vector =
-        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
+    NativeColumnVector *order_key_vector =
+        new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
 
     for (int i = 0; i < kNumTuples; ++i) {
       partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+      order_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerOrderKey));
     }
 
     // Create tuple ValueAccessor.
     ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
     tuple_accessor->addColumn(partition_key_vector);
+    tuple_accessor->addColumn(order_key_vector);
     tuple_accessor->addColumn(argument_type_vector);
 
     // Test UNBOUNDED PRECEDING AND CURRENT ROW.
@@ -182,45 +203,95 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
                        const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
     std::vector<ColumnVector*> arguments;
     arguments.push_back(argument_type_vector);
-    // The partition key index is 0.
-    std::vector<attribute_id> partition_key(1, 0);
 
-    ColumnVector *result =
-        handle_avg_->calculate(tuple_accessor,
-                               std::move(arguments),
-                               partition_key,
-                               true  /* is_row */,
-                               -1  /* num_preceding: UNBOUNDED PRECEDING */,
-                               0  /* num_following: CURRENT ROW */);
+    // Check ROWS mode.
+    WindowAggregationHandle *rows_handle =
+        initializeHandle(GenericType::Instance(true),
+                         true  /* is_row */,
+                         -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                         0  /* num_following: CURRENT ROW */);
+    ColumnVector *rows_result =
+        rows_handle->calculate(tuple_accessor, arguments);
 
     // Get the cpptype result.
-    std::vector<typename OutputType::cpptype*> result_cpp_vector;
-    typename GenericType::cpptype sum;
-    int count;
+    std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
+    typename GenericType::cpptype rows_sum;
+    int rows_count;
     for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
       // Start of new partition
       if (i % kNumTuplesPerPartition == 0) {
-        SetDataType(0, &sum);
-        count = 0;
+        SetDataType(0, &rows_sum);
+        rows_count = 0;
       }
 
       typename GenericType::cpptype *value = argument_cpp_vector[i];
       if (value != nullptr) {
-        sum += *value;
-        count++;
+        rows_sum += *value;
+        rows_count++;
       }
 
-      if (count == 0) {
-        result_cpp_vector.push_back(nullptr);
+      if (rows_count == 0) {
+        rows_result_cpp_vector.push_back(nullptr);
       } else {
         typename OutputType::cpptype *result_cpp_value =
             new typename OutputType::cpptype;
-        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
-        result_cpp_vector.push_back(result_cpp_value);
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(rows_sum) / rows_count;
+        rows_result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
+
+    CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+    // Check RANGE mode.
+    WindowAggregationHandle *range_handle =
+        initializeHandle(GenericType::Instance(true),
+                         false  /* is_row */,
+                         -1  /* num_preceding: UNBOUNDED PRECEDING */,
+                         0  /* num_following: CURRENT ROW */);
+    ColumnVector *range_result =
+        range_handle->calculate(tuple_accessor, arguments);
+
+    // Get the cpptype result.
+    std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+    typename GenericType::cpptype range_sum;
+    int range_count;
+    std::size_t current_tuple = 0;
+    while (current_tuple < kNumTuples) {
+      // Start of new partition
+      if (current_tuple % kNumTuplesPerPartition == 0) {
+        SetDataType(0, &range_sum);
+        range_count = 0;
+      }
+
+      // We have to consider following tuples with the same order key value.
+      std::size_t next_tuple = current_tuple;
+      while (next_tuple < kNumTuples &&
+             next_tuple / kNumTuplesPerPartition == current_tuple / kNumTuplesPerPartition &&
+             next_tuple / kNumTuplesPerOrderKey == current_tuple / kNumTuplesPerOrderKey) {
+        typename GenericType::cpptype *value = argument_cpp_vector[next_tuple];
+        if (value != nullptr) {
+          range_sum += *value;
+          range_count++;
+        }
+
+        next_tuple++;
+      }
+
+      // Calculate the result cpp value.
+      typename OutputType::cpptype *result_cpp_value = nullptr;
+      if (range_count != 0) {
+        result_cpp_value = new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(range_sum) / range_count;
+      }
+
+      // Add the result values to the tuples with in the same order key value.
+      while (current_tuple != next_tuple) {
+        range_result_cpp_vector.push_back(result_cpp_value);
+        current_tuple++;
       }
     }
 
-    CheckAvgValues(result_cpp_vector, result);
+    CheckAvgValues(range_result_cpp_vector, range_result);
   }
 
   template <typename GenericType, typename OutputType>
@@ -229,20 +300,19 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
                           const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
     std::vector<ColumnVector*> arguments;
     arguments.push_back(argument_type_vector);
-    // The partition key index is 0.
-    std::vector<attribute_id> partition_key(1, 0);
 
-    ColumnVector *result =
-        handle_avg_->calculate(tuple_accessor,
-                               std::move(arguments),
-                               partition_key,
-                               true  /* is_row */,
-                               kNumPreceding,
-                               kNumFollowing);
+    // Check ROWS mode.
+    WindowAggregationHandle *rows_handle =
+        initializeHandle(GenericType::Instance(true),
+                         true  /* is_row */,
+                         kNumPreceding,
+                         kNumFollowing);
+    ColumnVector *rows_result =
+        rows_handle->calculate(tuple_accessor, arguments);
 
     // Get the cpptype result.
     // For each value, calculate all surrounding values in the window.
-    std::vector<typename OutputType::cpptype*> result_cpp_vector;
+    std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
 
     for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
       typename GenericType::cpptype sum;
@@ -281,19 +351,81 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
       }
 
       if (count == 0) {
-        result_cpp_vector.push_back(nullptr);
+        rows_result_cpp_vector.push_back(nullptr);
       } else {
         typename OutputType::cpptype *result_cpp_value =
             new typename OutputType::cpptype;
         *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
-        result_cpp_vector.push_back(result_cpp_value);
+        rows_result_cpp_vector.push_back(result_cpp_value);
       }
     }
 
-    CheckAvgValues(result_cpp_vector, result);
-  }
+    CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+    // Check RANGE mode.
+    WindowAggregationHandle *range_handle =
+        initializeHandle(GenericType::Instance(true),
+                         false  /* is_row */,
+                         kNumPreceding,
+                         kNumFollowing);
+    ColumnVector *range_result =
+        range_handle->calculate(tuple_accessor, arguments);
+
+    // Get the cpptype result.
+    // For each value, calculate all surrounding values in the window.
+    std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+
+    for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+      typename GenericType::cpptype sum;
+      SetDataType(0, &sum);
+      int count = 0;
+
+      if (argument_cpp_vector[i] != nullptr) {
+        sum += *argument_cpp_vector[i];
+        count++;
+      }
+
+      int preceding_bound = i / kNumTuplesPerOrderKey - kNumPreceding;
+      for (std::size_t precede = 1; precede <= kNumTuples; ++precede) {
+        // Not in range or the same partition.
+        if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+            static_cast<int>((i - precede) / kNumTuplesPerOrderKey) < preceding_bound) {
+          break;
+        }
+
+        if (argument_cpp_vector[i - precede] != nullptr) {
+          sum += *argument_cpp_vector[i - precede];
+          count++;
+        }
+      }
+
+      int following_bound = i / kNumTuplesPerOrderKey + kNumFollowing;
+      for (int follow = 1; follow <= kNumTuples; ++follow) {
+        // Not in range or the same partition.
+        if (i + follow >= kNumTuples ||
+            i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+            static_cast<int>((i + follow) / kNumTuplesPerOrderKey) > following_bound) {
+          break;
+        }
+
+        if (argument_cpp_vector[i + follow] != nullptr) {
+          sum += *argument_cpp_vector[i + follow];
+          count++;
+        }
+      }
+
+      if (count == 0) {
+        rows_result_cpp_vector.push_back(nullptr);
+      } else {
+        typename OutputType::cpptype *result_cpp_value =
+            new typename OutputType::cpptype;
+        *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+        range_result_cpp_vector.push_back(result_cpp_value);
+      }
+    }
 
-  std::unique_ptr<WindowAggregationHandle> handle_avg_;
+    CheckAvgValues(range_result_cpp_vector, range_result);
+  }
 };
 
 template <>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..88103df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1663,7 +1663,7 @@ void ExecutionGenerator::convertWindowAggregate(
       std::static_pointer_cast<const E::WindowAggregateFunction>(
           named_window_aggregate_expression->expression());
 
-  // Set the AggregateFunction.
+  // Set the WindowAggregateFunction.
   window_aggr_state_proto->mutable_function()->MergeFrom(
       window_aggregate_function->window_aggregate().getProto());
 
@@ -1683,6 +1683,15 @@ void ExecutionGenerator::convertWindowAggregate(
         ->MergeFrom(concretized_partition_by_attribute->getProto());
   }
 
+  // Set order keys.
+  for (const E::ScalarPtr &order_by_attribute
+      : window_info.order_by_attributes) {
+    unique_ptr<const Scalar> concretized_order_by_attribute(
+        order_by_attribute->concretize(attribute_substitution_map_));
+    window_aggr_state_proto->add_order_by_attributes()
+        ->MergeFrom(concretized_order_by_attribute->getProto());
+  }
+
   // Set window frame info.
   if (window_info.frame_info == nullptr) {
     // If the frame is not specified, use the default setting:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c224388..46808bf 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -814,9 +814,9 @@ L::LogicalPtr Resolver::resolveInsertSelection(
       cast_expressions.emplace_back(selection_attributes[aid]);
     } else {
       // TODO(jianqiao): implement Cast operation for non-numeric types.
-      if (destination_type.getSuperTypeID() == Type::kNumeric
-          && selection_type.getSuperTypeID() == Type::kNumeric
-          && destination_type.isSafelyCoercibleFrom(selection_type)) {
+      if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+          selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+          destination_type.isSafelyCoercibleFrom(selection_type)) {
         // Add cast operation
         const E::AttributeReferencePtr attr = selection_attributes[aid];
         const E::ExpressionPtr cast_expr =
@@ -1691,6 +1691,19 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
   // Resolve window frame
   if (parse_window.frame_info() != nullptr) {
     const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+    // For FRAME mode, the first attribute in ORDER BY must be numeric.
+    // TODO(Shixuan): Time-related types should also be supported. To handle
+    // this, some changes in the parser needs to be done since the time range
+    // should be specified with time units. Also, UNBOUNDED flags might be
+    // needed because -1 might not make sense in this case.
+    if (!parse_frame_info->is_row &&
+        (order_by_attributes.empty() ||
+         order_by_attributes[0]->getValueType().getSuperTypeID() != Type::SuperTypeID::kNumeric)) {
+      THROW_SQL_ERROR_AT(&parse_window)
+          << "A numeric attribute should be specified as the first ORDER BY "
+          << "attribute in FRAME mode";
+    }
+
     frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
                                         parse_frame_info->num_preceding,
                                         parse_frame_info->num_following);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 30a3c39..6bada6c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -1025,17 +1025,40 @@ WINDOW w AS
 +--------------------+-----------+------------------------+
 ==
 
-# Currently this is not supported, an empty table will be returned.
-SELECT int_col, sum(float_col) OVER
-(PARTITION BY char_col, long_col
- ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
- RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+SELECT float_col, double_col, avg(double_col) OVER
+(ORDER BY float_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING)
 FROM test;
 --
-+-----------+------------------------+
-|int_col    |sum(float_col)          |
-+-----------+------------------------+
-+-----------+------------------------+
++---------------+------------------------+------------------------+
+|float_col      |double_col              |avg(double_col)         |
++---------------+------------------------+------------------------+
+|     4.89897966|      117.57550765359254|     -5.2010907233390986|
+|     4.79583168|     -110.30412503619254|     -3.3458568752518572|
+|     4.69041586|      103.18914671611546|     -3.3458568752518572|
+|      4.5825758|     -96.234089594072643|     -4.2942570191393745|
+|     4.47213602|                    NULL|     -4.2942570191393745|
+|     4.35889912|      -82.81907992727281|     -3.1771278735018194|
+|      4.2426405|      76.367532368147124|     -3.1771278735018194|
+|     4.12310553|     -70.092795635500224|     -3.6217507631683268|
+|              4|                      64|     -3.0100796703699935|
+|     3.87298346|     -58.094750193111253|     -3.0100796703699935|
+|      3.7416575|       52.38320341483518|     -3.0100796703699935|
+|     3.60555124|     -46.872166581031856|     -3.1193833079868254|
+|     3.46410155|      41.569219381653056|     -3.1193833079868254|
+|     3.31662488|       -36.4828726939094|     -2.8361542397614437|
+|      3.1622777|                    NULL|     -2.8361542397614437|
+|              3|                     -27|     -2.7526926834086507|
+|     2.82842708|      22.627416997969522|     -8.4826069851706123|
+|     2.64575124|     -18.520259177452136|     -9.0010404404476727|
+|     2.44948983|      14.696938456699067|     -4.1547599319129516|
+|     2.23606801|     -11.180339887498949|     -4.2708832009567148|
+|              2|                       8|     0.11724429467951912|
+|     1.73205078|      -5.196152422706632|     -4.7108157334609286|
+|     1.41421354|      2.8284271247461903|     -5.1226841602152344|
+|              1|                      -1|      -1.638218767582549|
+|              0|                    NULL|      1.1580686755098886|
++---------------+------------------------+------------------------+
 ==
 
 SELECT sum(avg(int_col) OVER w) FROM test

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 0cdfc1a..49fa44d 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -56,15 +56,13 @@ WindowAggregationOperationState::WindowAggregationOperationState(
     const WindowAggregateFunction *window_aggregate_function,
     std::vector<std::unique_ptr<const Scalar>> &&arguments,
     const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+    const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
     const bool is_row,
     const std::int64_t num_preceding,
     const std::int64_t num_following,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       arguments_(std::move(arguments)),
-      is_row_(is_row),
-      num_preceding_(num_preceding),
-      num_following_(num_following),
       storage_manager_(storage_manager) {
   // Get the Types of this window aggregate's arguments so that we can create an
   // AggregationHandle.
@@ -76,18 +74,14 @@ WindowAggregationOperationState::WindowAggregationOperationState(
   // Check if window aggregate function could apply to the arguments.
   DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
 
-  // IDs and types of partition keys.
-  std::vector<const Type*> partition_by_types;
-  for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
-    partition_by_ids_.push_back(
-        partition_by_attribute->getAttributeIdForValueAccessor());
-    partition_by_types.push_back(&partition_by_attribute->getType());
-  }
-
   // Create the handle and initial state.
   window_aggregation_handle_.reset(
-      window_aggregate_function->createHandle(std::move(argument_types),
-                                              std::move(partition_by_types)));
+      window_aggregate_function->createHandle(argument_types,
+                                              partition_by_attributes,
+                                              order_by_attributes,
+                                              is_row,
+                                              num_preceding,
+                                              num_following));
 }
 
 WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -113,6 +107,15 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
         database));
   }
 
+  std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+  for (int attribute_idx = 0;
+       attribute_idx < proto.order_by_attributes_size();
+       ++attribute_idx) {
+    order_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+        proto.order_by_attributes(attribute_idx),
+        database));
+  }
+
   const bool is_row = proto.is_row();
   const std::int64_t num_preceding = proto.num_preceding();
   const std::int64_t num_following = proto.num_following();
@@ -121,6 +124,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
                                              &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
                                              std::move(arguments),
                                              partition_by_attributes,
+                                             order_by_attributes,
                                              is_row,
                                              num_preceding,
                                              num_following,
@@ -160,6 +164,15 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
     }
   }
 
+  for (int attribute_idx = 0;
+       attribute_idx < proto.order_by_attributes_size();
+       ++attribute_idx) {
+    if (!ScalarFactory::ProtoIsValid(proto.order_by_attributes(attribute_idx),
+                                     database)) {
+      return false;
+    }
+  }
+
   if (proto.num_preceding() < -1 || proto.num_following() < -1) {
     return false;
   }
@@ -177,14 +190,6 @@ void WindowAggregationOperationState::windowAggregateBlocks(
     return;
   }
 
-  // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
-  // work. This will need Order Key to be passed so that we know where the
-  // window should start and end.
-  if (!is_row_) {
-    std::cout << "Currently we don't support RANGE frame mode :(\n";
-    return;
-  }
-
   // Get the total number of tuples.
   int num_tuples = 0;
   for (const block_id block_idx : block_ids) {
@@ -226,7 +231,11 @@ void WindowAggregationOperationState::windowAggregateBlocks(
                                      block->getIndices(),
                                      block->getIndicesConsistent());
     ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
-    ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+    ColumnVectorsValueAccessor *argument_accessor = nullptr;
+    if (!arguments_.empty()) {
+      argument_accessor = new ColumnVectorsValueAccessor();
+    }
+
     for (const std::unique_ptr<const Scalar> &argument : arguments_) {
       argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
                                                           &sub_block_ref));
@@ -235,9 +244,15 @@ void WindowAggregationOperationState::windowAggregateBlocks(
     InvokeOnAnyValueAccessor(tuple_accessor,
                              [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
       tuple_accessor->beginIteration();
-      argument_accessor->beginIteration();
+      if (argument_accessor != nullptr) {
+        argument_accessor->beginIteration();
+      }
+
+      while (tuple_accessor->next()) {
+        if (argument_accessor != nullptr) {
+          argument_accessor->next();
+        }
 
-      while (tuple_accessor->next() && argument_accessor->next()) {
         for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
           ColumnVector *attr_vec = attribute_vecs[attr_id];
           if (attr_vec->isNative()) {
@@ -275,11 +290,7 @@ void WindowAggregationOperationState::windowAggregateBlocks(
   // Do actual calculation in handle.
   ColumnVector *window_aggregates =
       window_aggregation_handle_->calculate(all_blocks_accessor,
-                                            std::move(argument_vecs),
-                                            partition_by_ids_,
-                                            is_row_,
-                                            num_preceding_,
-                                            num_following_);
+                                            argument_vecs);
 
   all_blocks_accessor->addColumn(window_aggregates);
   output_destination->bulkInsertTuples(all_blocks_accessor);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 9792a99..726b102 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -57,6 +57,7 @@ class WindowAggregationOperationState {
    *                                   computed.
    * @param arguments A list of argument expressions to that aggregate.
    * @param partition_by_attributes A list of window partition key.
+   * @param order_by_attributes A list of window order key.
    * @param is_row True if the window frame is calculated by ROW, false if it is
    *               calculated by RANGE.
    * @param num_preceding The number of rows/range for the tuples preceding the
@@ -69,6 +70,7 @@ class WindowAggregationOperationState {
                                   const WindowAggregateFunction *window_aggregate_function,
                                   std::vector<std::unique_ptr<const Scalar>> &&arguments,
                                   const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+                                  const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
                                   const bool is_row,
                                   const std::int64_t num_preceding,
                                   const std::int64_t num_following,
@@ -120,13 +122,6 @@ class WindowAggregationOperationState {
   const std::vector<block_id> block_ids_;
   std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
   std::vector<std::unique_ptr<const Scalar>> arguments_;
-  std::vector<attribute_id> partition_by_ids_;
-
-  // Frame info.
-  const bool is_row_;
-  const std::int64_t num_preceding_;
-  const std::int64_t num_following_;
-
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index d888461..f879713 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -30,4 +30,5 @@ message WindowAggregationOperationState {
   required bool is_row = 5;
   required int64 num_preceding = 6;  // -1 means UNBOUNDED PRECEDING.
   required int64 num_following = 7;  // -1 means UNBOUNDED FOLLOWING.
+  repeated Scalar order_by_attributes = 8;
 }


Mime
View raw message