quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [14/14] incubator-quickstep git commit: Vector implementation aggregation.
Date Sun, 06 Nov 2016 04:16:00 GMT
Vector implementation aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9e6f1933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9e6f1933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9e6f1933

Branch: refs/heads/collision-free-agg
Commit: 9e6f1933f3da229dc707553172f40371fb1e5d42
Parents: 156171c
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Tue Nov 1 11:45:52 2016 -0500
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Sat Nov 5 23:15:37 2016 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 cli/QuickstepCli.cpp                            |   9 +
 .../aggregation/AggregateFunctionCount.cpp      |   6 +-
 .../aggregation/AggregationConcreteHandle.hpp   |   4 +-
 expressions/aggregation/AggregationHandle.hpp   |  14 +-
 .../aggregation/AggregationHandleAvg.cpp        |   4 +-
 .../aggregation/AggregationHandleAvg.hpp        |   8 +
 .../aggregation/AggregationHandleCount.hpp      |  20 +-
 .../aggregation/AggregationHandleDistinct.hpp   |  13 +-
 .../aggregation/AggregationHandleMax.cpp        |   4 +-
 .../aggregation/AggregationHandleMax.hpp        |   8 +
 .../aggregation/AggregationHandleMin.cpp        |   4 +-
 .../aggregation/AggregationHandleMin.hpp        |   8 +
 .../aggregation/AggregationHandleSum.cpp        |   4 +-
 .../aggregation/AggregationHandleSum.hpp        |   8 +
 expressions/aggregation/AggregationID.hpp       |   4 +-
 expressions/aggregation/CMakeLists.txt          |  22 +-
 expressions/scalar/ScalarAttribute.cpp          |   2 +-
 query_optimizer/CMakeLists.txt                  |   2 +
 query_optimizer/ExecutionGenerator.cpp          | 124 ++++-
 query_optimizer/ExecutionGenerator.hpp          |   7 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |   1 -
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  47 ++
 query_optimizer/rules/CMakeLists.txt            |  16 +
 query_optimizer/rules/InjectJoinFilters.cpp     |  48 +-
 query_optimizer/rules/InjectJoinFilters.hpp     |   6 -
 .../PushDownLowCostDisjunctivePredicate.cpp     |  52 ++
 .../PushDownLowCostDisjunctivePredicate.hpp     |  68 +++
 relational_operators/AggregationOperator.cpp    |   4 +-
 relational_operators/CMakeLists.txt             |  15 +
 .../FinalizeAggregationOperator.cpp             |  18 +-
 .../FinalizeAggregationOperator.hpp             |  14 +-
 .../InitializeAggregationStateOperator.cpp      |  69 +++
 .../InitializeAggregationStateOperator.hpp      | 103 ++++
 storage/AggregationOperationState.cpp           | 409 +++++++++-----
 storage/AggregationOperationState.hpp           | 111 ++--
 storage/CMakeLists.txt                          |  26 +
 .../CollisionFreeAggregationStateHashTable.cpp  | 220 ++++++++
 .../CollisionFreeAggregationStateHashTable.hpp  | 546 +++++++++++++++++++
 storage/HashTable.proto                         |   7 +-
 storage/HashTableBase.hpp                       |   1 +
 storage/HashTableFactory.hpp                    |   6 +
 storage/TupleIdSequence.hpp                     |  10 +
 utility/CMakeLists.txt                          |  10 +
 utility/ConcurrentBitVector.hpp                 | 196 +++++++
 utility/EventProfiler.cpp                       |  28 +
 utility/EventProfiler.hpp                       | 187 +++++++
 utility/ExecutionDAGVisualizer.cpp              |   3 +
 utility/lip_filter/BitVectorExactFilter.hpp     |   2 +-
 49 files changed, 2215 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 487aaf9..ae68452 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -771,6 +771,7 @@ target_link_libraries(quickstep_cli_shell
                       quickstep_storage_PreloaderThread
                       quickstep_threading_ThreadIDBasedMap
                       quickstep_utility_ExecutionDAGVisualizer
+                      quickstep_utility_EventProfiler
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector
                       quickstep_utility_SqlError

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 8269197..419aefb 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -75,6 +75,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl;
 
 #include "storage/PreloaderThread.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
+#include "utility/EventProfiler.hpp"
 #include "utility/ExecutionDAGVisualizer.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -190,6 +191,8 @@ DEFINE_bool(visualize_execution_dag, false,
             "If true, visualize the execution plan DAG into a graph in DOT "
             "format (DOT is a plain text graph description language) which is "
             "then printed via stderr.");
+DEFINE_string(profile_output, "",
+              "Output file name for writing the profiled events.");
 
 }  // namespace quickstep
 
@@ -449,6 +452,7 @@ int main(int argc, char* argv[]) {
               new quickstep::ExecutionDAGVisualizer(*query_handle->getQueryPlanMutable()));
         }
 
+        quickstep::simple_profiler.clear();
         start = std::chrono::steady_clock::now();
         QueryExecutionUtil::ConstructAndSendAdmitRequestMessage(
             main_thread_client_id,
@@ -492,6 +496,11 @@ int main(int argc, char* argv[]) {
             dag_visualizer->bindProfilingStats(profiling_stats);
             std::cerr << "\n" << dag_visualizer->toDOT() << "\n";
           }
+          if (!quickstep::FLAGS_profile_output.empty()) {
+            std::ofstream ofs(quickstep::FLAGS_profile_output, std::ios::out);
+            quickstep::simple_profiler.writeToStream(ofs);
+            ofs.close();
+          }
         } catch (const std::exception &e) {
           fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what());
           break;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregateFunctionCount.cpp b/expressions/aggregation/AggregateFunctionCount.cpp
index 466ff2f..9795b4a 100644
--- a/expressions/aggregation/AggregateFunctionCount.cpp
+++ b/expressions/aggregation/AggregateFunctionCount.cpp
@@ -53,16 +53,16 @@ AggregationHandle* AggregateFunctionCount::createHandle(
 
   if (argument_types.empty()) {
     // COUNT(*)
-    return new AggregationHandleCount<true, false>();
+    return new AggregationHandleCount<true, false>(nullptr);
   } else if (argument_types.front()->isNullable()) {
     // COUNT(some_nullable_argument)
-    return new AggregationHandleCount<false, true>();
+    return new AggregationHandleCount<false, true>(argument_types.front());
   } else {
     // COUNT(non_nullable_argument)
     //
     // TODO(chasseur): Modify query optimizer to optimize-away COUNT with a
     // non-nullable argument and convert it to COUNT(*).
-    return new AggregationHandleCount<false, false>();
+    return new AggregationHandleCount<false, false>(argument_types.front());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index f1259c0..93e9bd0 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
@@ -101,7 +102,8 @@ class AggregationConcreteHandle : public AggregationHandle {
   }
 
  protected:
-  AggregationConcreteHandle() {}
+  AggregationConcreteHandle(const AggregationID agg_id)
+      : AggregationHandle(agg_id) {}
 
   template <typename HandleT, typename HashTableT>
   ColumnVector* finalizeHashTableHelper(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index e004511..8e2aea6 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -109,6 +110,14 @@ class AggregationHandle {
    **/
   virtual ~AggregationHandle() {}
 
+  AggregationID getAggregationID() const {
+    return agg_id_;
+  }
+
+  virtual std::vector<const Type *> getArgumentTypes() const = 0;
+
+  virtual const Type* getResultType() const = 0;
+
   /**
    * @brief Create an initial "blank" state for this aggregation.
    *
@@ -258,7 +267,10 @@ class AggregationHandle {
   virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
 
  protected:
-  AggregationHandle() {}
+  AggregationHandle(const AggregationID agg_id)
+      : agg_id_(agg_id) {}
+
+  const AggregationID agg_id_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationHandle);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index 3d2db8c..d81c179 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -41,7 +42,8 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleAvg::AggregationHandleAvg(const Type &type)
-    : argument_type_(type) {
+    : AggregationConcreteHandle(AggregationID::kAvg),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index b40305c..aa5f427 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -105,6 +105,14 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
  public:
   ~AggregationHandleAvg() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&argument_type_};
+  }
+
+  const Type* getResultType() const override {
+    return result_type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateAvg(blank_state_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 86da2a8..bf9450f 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -29,7 +29,9 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "types/LongType.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
@@ -97,6 +99,18 @@ class AggregationHandleCount : public AggregationConcreteHandle {
  public:
   ~AggregationHandleCount() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    if (argument_type_ == nullptr) {
+      return {};
+    } else {
+      return {argument_type_};
+    }
+  }
+
+  const Type* getResultType() const override {
+    return &LongType::InstanceNonNullable();
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateCount();
   }
@@ -172,7 +186,11 @@ class AggregationHandleCount : public AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleCount() {}
+  AggregationHandleCount(const Type *argument_type)
+      : AggregationConcreteHandle(AggregationID::kCount),
+        argument_type_(argument_type) {}
+
+  const Type *argument_type_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
index deb928a..0d8905b 100644
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ b/expressions/aggregation/AggregationHandleDistinct.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -49,7 +50,17 @@ class AggregationHandleDistinct : public AggregationConcreteHandle {
   /**
    * @brief Constructor.
    **/
-  AggregationHandleDistinct() {}
+  AggregationHandleDistinct()
+      : AggregationConcreteHandle(AggregationID::kDistinct) {}
+
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {};
+  }
+
+  const Type* getResultType() const override {
+    LOG(FATAL)
+        << "AggregationHandleDistinct does not support getResultType().";
+  }
 
   AggregationState* createInitialState() const override {
     LOG(FATAL)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 6ffca0a..327b2b2 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -38,7 +39,8 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMax::AggregationHandleMax(const Type &type)
-    : type_(type) {
+    : AggregationConcreteHandle(AggregationID::kMax),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kGreater)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 8eddd7c..635c7d8 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -85,6 +85,14 @@ class AggregationHandleMax : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMax() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMax(type_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index f73fedb..fe4a61b 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -38,7 +39,8 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleMin::AggregationHandleMin(const Type &type)
-    : type_(type) {
+    : AggregationConcreteHandle(AggregationID::kMin),
+      type_(type) {
   fast_comparator_.reset(
       ComparisonFactory::GetComparison(ComparisonID::kLess)
           .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 7e129e6..3571f02 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -87,6 +87,14 @@ class AggregationHandleMin : public AggregationConcreteHandle {
  public:
   ~AggregationHandleMin() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&type_};
+  }
+
+  const Type* getResultType() const override {
+    return &type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateMin(type_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 9ddeb99..00b229e 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
@@ -42,7 +43,8 @@ namespace quickstep {
 class StorageManager;
 
 AggregationHandleSum::AggregationHandleSum(const Type &type)
-    : argument_type_(type) {
+    : AggregationConcreteHandle(AggregationID::kSum),
+      argument_type_(type) {
   // We sum Int as Long and Float as Double so that we have more headroom when
   // adding many values.
   TypeID type_precision_id;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 60d8416..9fb7706 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -101,6 +101,14 @@ class AggregationHandleSum : public AggregationConcreteHandle {
  public:
   ~AggregationHandleSum() override {}
 
+  std::vector<const Type *> getArgumentTypes() const override {
+    return {&argument_type_};
+  }
+
+  const Type* getResultType() const override {
+    return result_type_;
+  }
+
   AggregationState* createInitialState() const override {
     return new AggregationStateSum(blank_state_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/AggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationID.hpp b/expressions/aggregation/AggregationID.hpp
index 1efb35c..cd18d47 100644
--- a/expressions/aggregation/AggregationID.hpp
+++ b/expressions/aggregation/AggregationID.hpp
@@ -32,9 +32,11 @@ namespace quickstep {
 enum class AggregationID {
   kAvg = 0,
   kCount,
+  kDistinct,
   kMax,
   kMin,
-  kSum
+  kSum,
+  kUnknown
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index a31ee36..bd239d4 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,8 +146,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
                       quickstep_threading_SpinMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
@@ -155,6 +155,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
                       glog
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
                       quickstep_types_TypedValue
                       quickstep_utility_Macros)
@@ -163,8 +164,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -179,10 +181,12 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
+                      quickstep_types_LongType
                       quickstep_types_TypeFactory
                       quickstep_types_TypeID
                       quickstep_types_TypedValue
@@ -193,8 +197,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinc
                       glog
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
-                      quickstep_storage_HashTable
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_types_TypedValue
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
@@ -202,8 +207,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -217,8 +223,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -232,8 +239,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationConcreteHandle
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/expressions/scalar/ScalarAttribute.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index cc42084..4eb0e53 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin(
   ValueAccessor *accessor = using_left_relation ? left_accessor
                                                 : right_accessor;
 
-  return InvokeOnAnyValueAccessor(
+  return InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
       accessor,
       [&joined_tuple_ids,
        &attr_id,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 0699bbd..225955d 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -127,6 +127,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationStateOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RelationalOperator
@@ -147,6 +148,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
+                      quickstep_types_TypeID
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 82b733e..a86b1f3 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -47,6 +47,7 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -103,6 +104,7 @@
 #include "relational_operators/DropTableOperator.hpp"
 #include "relational_operators/FinalizeAggregationOperator.hpp"
 #include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
 #include "relational_operators/InsertOperator.hpp"
 #include "relational_operators/NestedLoopsJoinOperator.hpp"
 #include "relational_operators/RelationalOperator.hpp"
@@ -124,6 +126,7 @@
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
@@ -1386,6 +1389,8 @@ void ExecutionGenerator::convertAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
 
+  bool use_parallel_initialization = false;
+
   std::vector<const Type*> group_by_types;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
     unique_ptr<const Scalar> execution_group_by_expression;
@@ -1406,9 +1411,33 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (!group_by_types.empty()) {
-    // Right now, only SeparateChaining is supported.
-    aggr_state_proto->set_hash_table_impl_type(
-        serialization::HashTableImplType::SEPARATE_CHAINING);
+    const std::size_t estimated_num_groups =
+        cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
+    std::size_t exact_num_groups;
+    const bool can_use_collision_free_aggregation =
+        canUseCollisionFreeAggregation(physical_plan,
+                                       estimated_num_groups,
+                                       &exact_num_groups);
+
+    if (can_use_collision_free_aggregation) {
+      aggr_state_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::COLLISION_FREE_COLUMNWISE);
+      std::cout << "Use collision free aggregation!\n"
+                << "Size = " << exact_num_groups << "\n";
+
+      aggr_state_proto->set_estimated_num_entries(exact_num_groups);
+      use_parallel_initialization = true;
+    } else {
+      // Otherwise, use SeparateChaining.
+      aggr_state_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+      std::cout << "Use normal aggregation\n"
+                << "Size = " << estimated_num_groups << "\n";
+
+      aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
+    }
+  } else {
+    aggr_state_proto->set_estimated_num_entries(1uL);
   }
 
   for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1446,10 +1475,6 @@ void ExecutionGenerator::convertAggregate(
     aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
   }
 
-  const std::size_t estimated_num_groups =
-      cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
-  aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
   const QueryPlan::DAGNodeIndex aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new AggregationOperator(
@@ -1464,6 +1489,19 @@ void ExecutionGenerator::convertAggregate(
                                          false /* is_pipeline_breaker */);
   }
 
+  if (use_parallel_initialization) {
+    const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index =
+        execution_plan_->addRelationalOperator(
+            new InitializeAggregationStateOperator(
+                query_handle_->query_id(),
+                aggr_state_index));
+
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         initialize_aggregation_state_operator_index,
+                                         true);
+  }
+
+
   // Create InsertDestination proto.
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
@@ -1508,6 +1546,78 @@ void ExecutionGenerator::convertAggregate(
   }
 }
 
+bool ExecutionGenerator::canUseCollisionFreeAggregation(
+    const physical::AggregatePtr &physical_plan,
+    const std::size_t estimated_num_groups,
+    std::size_t *exact_num_groups) {
+  if (physical_plan->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  E::AttributeReferencePtr group_by_key_attr;
+  E::ExpressionPtr agg_expr = physical_plan->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  const TypeID key_type_id = group_by_key_attr->getValueType().getTypeID();
+  if (key_type_id != TypeID::kInt && key_type_id != TypeID::kLong) {
+    return false;
+  }
+
+  for (const auto &agg_expr : physical_plan->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+    switch (agg_func->getAggregate().getAggregationID()) {
+//      case AggregationID::kAvg:  // Fall through
+      case AggregationID::kCount:
+      case AggregationID::kSum:
+        break;
+      default:
+        return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1) {
+      return false;
+    }
+
+    if (arguments.size() == 1) {
+      switch (arguments.front()->getValueType().getTypeID()) {
+        case TypeID::kInt:  // Fall through
+        case TypeID::kLong:
+        case TypeID::kFloat:
+        case TypeID::kDouble:
+          break;
+        default:
+          return false;
+      }
+    }
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  const bool has_min_max_stats =
+      cost_model_for_aggregation_->findMinMaxStatsCppValue(
+          physical_plan->input(),
+          group_by_key_attr,
+          &min_cpp_value,
+          &max_cpp_value);
+
+  if (!has_min_max_stats) {
+    return false;
+  }
+
+  if (min_cpp_value < 0 ||
+      max_cpp_value > 1000000000 ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   // Create sort configuration for run generation.
   vector<bool> sort_ordering(physical_sort->sort_ascending());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index a874dbb..36f3bd7 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -37,6 +37,7 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "query_optimizer/QueryPlan.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -338,6 +339,10 @@ class ExecutionGenerator {
    */
   void convertAggregate(const physical::AggregatePtr &physical_plan);
 
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &physical_plan,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *exact_num_groups);
+
   /**
    * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
    *
@@ -427,7 +432,7 @@ class ExecutionGenerator {
   /**
    * @brief The cost model to use for estimating aggregation hash table size.
    */
-  std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
 
   /**
    * @brief The cost model to use for estimating join hash table size.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index c81fa96..8d38dc9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -529,7 +529,6 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
   return kInvalidAttributeID;
 }
 
-
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 85914cd..b5def39 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -126,6 +126,13 @@ class StarSchemaSimpleCostModel : public CostModel {
         physical_plan, attribute->id(), StatType::kMax);
   }
 
+  template <typename CppType>
+  bool findMinMaxStatsCppValue(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      CppType *min_cpp_value,
+      CppType *max_cpp_value);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
@@ -185,6 +192,46 @@ class StarSchemaSimpleCostModel : public CostModel {
   DISALLOW_COPY_AND_ASSIGN(StarSchemaSimpleCostModel);
 };
 
+template <typename CppType>
+bool StarSchemaSimpleCostModel::findMinMaxStatsCppValue(
+    const physical::PhysicalPtr &physical_plan,
+    const expressions::AttributeReferencePtr &attribute,
+    CppType *min_cpp_value,
+    CppType *max_cpp_value) {
+  const TypedValue min_value =
+      findMinValueStat(physical_plan, attribute);
+  const TypedValue max_value =
+      findMaxValueStat(physical_plan, attribute);
+  if (min_value.isNull() || max_value.isNull()) {
+    return false;
+  }
+
+  switch (attribute->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      *min_cpp_value = min_value.getLiteral<int>();
+      *max_cpp_value = max_value.getLiteral<int>();
+      return true;
+    }
+    case TypeID::kLong: {
+      *min_cpp_value = min_value.getLiteral<std::int64_t>();
+      *max_cpp_value = max_value.getLiteral<std::int64_t>();
+      return true;
+    }
+    case TypeID::kFloat: {
+      *min_cpp_value = min_value.getLiteral<float>();
+      *max_cpp_value = max_value.getLiteral<float>();
+      return true;
+    }
+    case TypeID::kDouble: {
+      *min_cpp_value = min_value.getLiteral<double>();
+      *max_cpp_value = max_value.getLiteral<double>();
+      return true;
+    }
+    default:
+      return false;
+  }
+}
+
 /** @} */
 
 }  // namespace cost

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 1eeb3a5..c44e576 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -25,6 +25,9 @@ add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp Gener
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
+add_library(quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
+            PushDownLowCostDisjunctivePredicate.cpp
+            PushDownLowCostDisjunctivePredicate.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownSemiAntiJoin PushDownSemiAntiJoin.cpp PushDownSemiAntiJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_Rule ../../empty_src.cpp Rule.hpp)
 add_library(quickstep_queryoptimizer_rules_RuleHelper RuleHelper.cpp RuleHelper.hpp)
@@ -111,6 +114,19 @@ target_link_libraries(quickstep_queryoptimizer_rules_PushDownFilter
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_PushDownSemiAntiJoin
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/rules/InjectJoinFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp
index 2fcff3e..0b003ae 100644
--- a/query_optimizer/rules/InjectJoinFilters.cpp
+++ b/query_optimizer/rules/InjectJoinFilters.cpp
@@ -105,10 +105,10 @@ bool InjectJoinFilters::isTransformable(
   std::int64_t min_cpp_value;
   std::int64_t max_cpp_value;
   const bool has_min_max_stats =
-      findMinMaxValuesForAttributeHelper(hash_join->right(),
-                                         hash_join->right_join_attributes().front(),
-                                         &min_cpp_value,
-                                         &max_cpp_value);
+      cost_model_->findMinMaxStatsCppValue(hash_join->right(),
+                                           hash_join->right_join_attributes().front(),
+                                           &min_cpp_value,
+                                           &max_cpp_value);
   if (!has_min_max_stats) {
     return false;
   }
@@ -331,10 +331,10 @@ void InjectJoinFilters::concretizeAsLIPFilters(
       std::int64_t min_cpp_value;
       std::int64_t max_cpp_value;
       const bool has_min_max_stats =
-          findMinMaxValuesForAttributeHelper(filter_injection,
-                                             build_attr,
-                                             &min_cpp_value,
-                                             &max_cpp_value);
+          cost_model_->findMinMaxStatsCppValue(filter_injection,
+                                               build_attr,
+                                               &min_cpp_value,
+                                               &max_cpp_value);
       DCHECK(has_min_max_stats);
       DCHECK_GE(min_cpp_value, 0);
       DCHECK_GE(max_cpp_value, 0);
@@ -344,7 +344,7 @@ void InjectJoinFilters::concretizeAsLIPFilters(
       lip_filter_configuration_->addBuildInfo(
           build_attr,
           filter_injection,
-          static_cast<std::size_t>(max_cpp_value),
+          static_cast<std::size_t>(max_cpp_value) + 1,
           LIPFilterType::kBitVectorExactFilter,
           filter_injection->is_anti_filter());
       lip_filter_configuration_->addProbeInfo(
@@ -365,35 +365,5 @@ void InjectJoinFilters::concretizeAsLIPFilters(
   }
 }
 
-bool InjectJoinFilters::findMinMaxValuesForAttributeHelper(
-    const physical::PhysicalPtr &physical_plan,
-    const expressions::AttributeReferencePtr &attribute,
-    std::int64_t *min_cpp_value,
-    std::int64_t *max_cpp_value) const {
-  const TypedValue min_value =
-      cost_model_->findMinValueStat(physical_plan, attribute);
-  const TypedValue max_value =
-      cost_model_->findMaxValueStat(physical_plan, attribute);
-  if (min_value.isNull() || max_value.isNull()) {
-    return false;
-  }
-
-  switch (attribute->getValueType().getTypeID()) {
-    case TypeID::kInt: {
-      *min_cpp_value = min_value.getLiteral<int>();
-      *max_cpp_value = max_value.getLiteral<int>();
-      return true;
-    }
-    case TypeID::kLong: {
-      *min_cpp_value = min_value.getLiteral<std::int64_t>();
-      *max_cpp_value = max_value.getLiteral<std::int64_t>();
-      return true;
-    }
-    default:
-      return false;
-  }
-}
-
-
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/rules/InjectJoinFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp
index 6c413d9..6c9c770 100644
--- a/query_optimizer/rules/InjectJoinFilters.hpp
+++ b/query_optimizer/rules/InjectJoinFilters.hpp
@@ -76,12 +76,6 @@ class InjectJoinFilters : public Rule<physical::Physical> {
       const physical::PhysicalPtr &build_child,
       const physical::FilterInjectionPtr &filter_injection) const;
 
-  bool findMinMaxValuesForAttributeHelper(
-      const physical::PhysicalPtr &physical_plan,
-      const expressions::AttributeReferencePtr &attribute,
-      std::int64_t *min_cpp_value,
-      std::int64_t *max_cpp_value) const;
-
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
   std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.cpp b/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.cpp
new file mode 100644
index 0000000..dfe5d53
--- /dev/null
+++ b/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.cpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/AttachLIPFilters.hpp"
+
+#include <map>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <vector>
+#include <utility>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp b/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp
new file mode 100644
index 0000000..9017437
--- /dev/null
+++ b/query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_LOW_COST_DISJUNCTIVE_PREDICATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_LOW_COST_DISJUNCTIVE_PREDICATE_HPP_
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class PushDownLowCostDisjunctivePredicate : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  PushDownLowCostDisjunctivePredicate() {}
+
+  ~PushDownLowCostDisjunctivePredicate() override {}
+
+  std::string getName() const override {
+    return "PushDownLowCostDisjunctivePredicate";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(PushDownLowCostDisjunctivePredicate);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_LOW_COST_DISJUNCTIVE_PREDICATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index e111f5b..4de282e 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -102,7 +102,9 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
 }
 
 void AggregationWorkOrder::execute() {
-  state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get());
+//  if (input_block_id_ == static_cast<block_id>(-1)) {
+    state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get());
+//  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 225f093..82460dc 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -51,6 +51,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator
             FinalizeAggregationOperator.cpp
             FinalizeAggregationOperator.hpp)
 add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationStateOperator
+            InitializeAggregationStateOperator.cpp
+            InitializeAggregationStateOperator.hpp)
 add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp)
 add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
             NestedLoopsJoinOperator.cpp
@@ -252,6 +255,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator
+                      glog
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_InsertOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -544,6 +558,7 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationStateOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_RebuildWorkOrder

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 0cbf635..b704224 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
     AggregationOperationState *agg_state =
         query_context->getAggregationState(aggr_state_index_);
     DCHECK(agg_state != nullptr);
-    for (int part_id = 0;
-         part_id < static_cast<int>(agg_state->getNumPartitions());
-         ++part_id) {
+    for (std::size_t partition_id = 0;
+         partition_id < agg_state->getNumPartitions();
+         ++partition_id) {
       container->addNormalWorkOrder(
           new FinalizeAggregationWorkOrder(
               query_id_,
+              partition_id,
               agg_state,
-              query_context->getInsertDestination(output_destination_index_),
-              part_id),
+              query_context->getInsertDestination(output_destination_index_)),
           op_index_);
     }
   }
@@ -80,11 +80,9 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 }
 
 void FinalizeAggregationWorkOrder::execute() {
-  if (state_->isAggregatePartitioned()) {
-    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
-  } else {
-    state_->finalizeAggregate(output_destination_);
-  }
+//  if (output_destination_ == nullptr) {
+    state_->finalizeAggregate(partition_id_, output_destination_);
+//  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index ae7127a..3c209b1 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @note InsertWorkOrder takes ownership of \c state.
    *
    * @param query_id The ID of the query to which this operator belongs.
+   * @param partition_id The partition ID for which the Finalize aggregation
+   *        work order is issued.
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
-   * @param part_id The partition ID for which the Finalize aggregation work
-   *        order is issued. Ignore if aggregation is not partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
+                               const std::size_t partition_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination,
-                               const int part_id = -1)
+                               InsertDestination *output_destination)
       : WorkOrder(query_id),
+        partition_id_(partition_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)),
-        part_id_(part_id) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
   void execute() override;
 
  private:
+  const std::size_t partition_id_;
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
-  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/InitializeAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp
new file mode 100644
index 0000000..43a5fc8
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.cpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
+
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationStateOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (!started_) {
+    AggregationOperationState *agg_state =
+        query_context->getAggregationState(aggr_state_index_);
+    DCHECK(agg_state != nullptr);
+
+    for (std::size_t partition_id = 0;
+         partition_id < agg_state->getNumInitializePartitions();
+         ++partition_id) {
+      container->addNormalWorkOrder(
+          new InitializeAggregationStateWorkOrder(
+              query_id_,
+              partition_id,
+              agg_state),
+          op_index_);
+    }
+    started_ = true;
+  }
+  return started_;
+}
+
+bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  // TODO
+  LOG(FATAL) << "Not implemented";
+}
+
+void InitializeAggregationStateWorkOrder::execute() {
+  state_->initializeState(partition_id_);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9e6f1933/relational_operators/InitializeAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.hpp b/relational_operators/InitializeAggregationStateOperator.hpp
new file mode 100644
index 0000000..10403b3
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.hpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+class InitializeAggregationStateOperator : public RelationalOperator {
+ public:
+  InitializeAggregationStateOperator(const std::size_t query_id,
+                                     const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        aggr_state_index_(aggr_state_index),
+        started_(false) {}
+
+  ~InitializeAggregationStateOperator() override {}
+
+  std::string getName() const override {
+    return "InitializeAggregationStateOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+  const QueryContext::aggregation_state_id aggr_state_index_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator);
+};
+
+class InitializeAggregationStateWorkOrder : public WorkOrder {
+ public:
+  InitializeAggregationStateWorkOrder(const std::size_t query_id,
+                                      const std::size_t partition_id,
+                                      AggregationOperationState *state)
+      : WorkOrder(query_id),
+        partition_id_(partition_id),
+        state_(DCHECK_NOTNULL(state)) {}
+
+  ~InitializeAggregationStateWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const std::size_t partition_id_;
+
+  AggregationOperationState *state_;
+
+  DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_


Mime
View raw message