quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [6/6] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.
Date Wed, 08 Feb 2017 18:08:47 GMT
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1db1e088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1db1e088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1db1e088

Branch: refs/heads/aggregate-on-left-outer-join
Commit: 1db1e08878bccaa87934741569aba5c6fff137e8
Parents: a9fe07d
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Wed Feb 8 12:09:06 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   3 +
 query_optimizer/ExecutionGenerator.cpp          | 148 +++++++++++
 query_optimizer/ExecutionGenerator.hpp          |   4 +
 query_optimizer/PhysicalGenerator.cpp           |   3 +
 query_optimizer/cost_model/CMakeLists.txt       |   2 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  37 ++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |   4 +
 query_optimizer/physical/CMakeLists.txt         |  14 ++
 .../CrossReferenceCoalesceAggregate.cpp         | 105 ++++++++
 .../CrossReferenceCoalesceAggregate.hpp         | 176 +++++++++++++
 query_optimizer/physical/PatternMatcher.hpp     |   3 +
 query_optimizer/physical/PhysicalType.hpp       |   1 +
 query_optimizer/rules/BottomUpRule.hpp          |  39 +--
 query_optimizer/rules/CMakeLists.txt            |  23 ++
 query_optimizer/rules/FuseAggregateJoin.cpp     | 249 +++++++++++++++++++
 query_optimizer/rules/FuseAggregateJoin.hpp     |  73 ++++++
 .../BuildAggregationExistenceMapOperator.cpp    | 148 +++++++++++
 .../BuildAggregationExistenceMapOperator.hpp    | 146 +++++++++++
 relational_operators/CMakeLists.txt             |  25 ++
 storage/AggregationOperationState.cpp           |  13 +-
 storage/AggregationOperationState.hpp           |   3 +
 storage/CollisionFreeVectorTable.hpp            |   4 +
 utility/lip_filter/BitVectorExactFilter.hpp     |  26 +-
 utility/lip_filter/CMakeLists.txt               |  10 +-
 utility/lip_filter/SingleIdentityHashFilter.hpp |  20 +-
 27 files changed, 1239 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..90bc19f 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -95,6 +95,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +117,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_UpdateTable
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_CreateIndexOperator
@@ -213,6 +215,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..5b00eef 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -72,9 +72,11 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +98,7 @@
 #include "query_optimizer/physical/UpdateTable.hpp"
 #include "query_optimizer/physical/WindowAggregate.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/CreateIndexOperator.hpp"
@@ -266,6 +269,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kAggregate:
       return convertAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return convertCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -1730,6 +1736,148 @@ void ExecutionGenerator::convertAggregate(
   }
 }
 
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+  DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+  const CatalogRelationInfo *left_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->left_child());
+  const CatalogRelationInfo *right_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+  // Create aggr state proto.
+  const QueryContext::aggregation_state_id aggr_state_index =
+      query_context_proto_->aggregation_states_size();
+  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+  // Group by the right join attribute.
+  std::unique_ptr<const Scalar> execution_group_by_expression(
+      physical_plan->right_join_attributes().front()->concretize(
+          attribute_substitution_map_));
+  aggr_state_proto->add_group_by_expressions()->CopyFrom(
+      execution_group_by_expression->getProto());
+
+  aggr_state_proto->set_hash_table_impl_type(
+      serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+  aggr_state_proto->set_estimated_num_entries(
+      physical_plan->hash_table_num_entries());
+
+  if (physical_plan->right_filter_predicate() != nullptr) {
+    std::unique_ptr<const Predicate> predicate(
+        convertPredicate(physical_plan->right_filter_predicate()));
+    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+  }
+
+  for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+    const E::AggregateFunctionPtr unnamed_aggregate_expression =
+        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+    // Add a new entry in 'aggregates'.
+    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+    // Set the AggregateFunction.
+    aggr_proto->mutable_function()->CopyFrom(
+        unnamed_aggregate_expression->getAggregate().getProto());
+
+    // Add each of the aggregate's arguments.
+    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+    }
+
+    // Set whether it is a DISTINCT aggregation.
+    DCHECK(!unnamed_aggregate_expression->is_distinct());
+    aggr_proto->set_is_distinct(false);
+  }
+
+  const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InitializeAggregationOperator(
+              query_handle_->query_id(),
+              aggr_state_index));
+
+  const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+      execution_plan_->addRelationalOperator(
+          new BuildAggregationExistenceMapOperator(
+              query_handle_->query_id(),
+              *left_relation_info->relation,
+              physical_plan->left_join_attributes().front()->id(),
+              left_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!left_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                         left_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  const QueryPlan::DAGNodeIndex aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new AggregationOperator(
+              query_handle_->query_id(),
+              *right_relation_info->relation,
+              right_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!right_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         right_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  // Build aggregation existence map once initialization is done.
+  execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                       initialize_aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  // Start aggregation after building existence map.
+  execution_plan_->addDirectDependency(aggregation_operator_index,
+                                       build_aggregation_existence_map_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+
+  // Create InsertDestination proto.
+  const CatalogRelation *output_relation = nullptr;
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  createTemporaryCatalogRelation(physical_plan,
+                                 &output_relation,
+                                 insert_destination_proto);
+
+  const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new FinalizeAggregationOperator(query_handle_->query_id(),
+                                          aggr_state_index,
+                                          *output_relation,
+                                          insert_destination_index));
+
+  insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+  execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+                                       aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+                                            output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
+}
+
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   // Create sort configuration for run generation.
   vector<bool> sort_ordering(physical_sort->sort_ascending());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..db9664e 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -356,6 +357,9 @@ class ExecutionGenerator {
    */
   void convertAggregate(const physical::AggregatePtr &physical_plan);
 
+  void convertCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   /**
    * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
+  rules.emplace_back(new FuseAggregateJoin());
+
   // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
   // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
   // suggested that all the new rules be placed before this point.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..035a1ac 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -62,6 +63,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
     const physical::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
 
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  // Returns the cardinality of the left child plan.
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   // Return the estimated cardinality of the input plan.
   std::size_t estimateCardinalityForWindowAggregate(
       const physical::WindowAggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..82e01c2 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -37,6 +37,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -88,6 +89,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +179,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
       estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
     const P::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());
@@ -233,6 +242,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
       }
       break;
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::PhysicalPtr left_child = physical_plan->children()[0];
+      if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+        return estimateNumDistinctValues(attribute_id, left_child);
+      }
+      break;
+    }
     case P::PhysicalType::kFilterJoin: {
       const P::FilterJoinPtr &filter_join =
           std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +291,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
 double StarSchemaSimpleCostModel::estimateSelectivity(
     const physical::PhysicalPtr &physical_plan) {
   switch (physical_plan->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(physical_plan);
+      return estimateSelectivity(aggregate->input()) *
+          estimateSelectivityForFilterPredicate(aggregate);
+    }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+    }
     case P::PhysicalType::kSelection: {
       const P::SelectionPtr &selection =
           std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +358,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
 
 double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     const physical::PhysicalPtr &physical_plan) {
+  P::PhysicalPtr target_plan = physical_plan;
   E::PredicatePtr filter_predicate = nullptr;
   switch (physical_plan->getPhysicalType()) {
     case P::PhysicalType::kSelection:
@@ -340,6 +368,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     case P::PhysicalType::kAggregate:
       filter_predicate =
           std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+      target_plan = physical_plan->children()[0];
       break;
     case P::PhysicalType::kHashJoin:
       filter_predicate =
@@ -356,7 +385,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
   if (filter_predicate == nullptr) {
     return 1.0;
   } else {
-    return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+    return estimateSelectivityForPredicate(filter_predicate, target_plan);
   }
 }
 
@@ -443,6 +472,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
           std::static_pointer_cast<const P::Aggregate>(physical_plan);
       return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return E::SubsetOfExpressions(
+          aggregate_on_left_outer_join->left_join_attributes(), attributes);
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..99795a3 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -170,6 +171,9 @@ class StarSchemaSimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   std::size_t estimateCardinalityForFilterJoin(
       const physical::FilterJoinPtr &physical_plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+            CrossReferenceCoalesceAggregate.cpp
+            CrossReferenceCoalesceAggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
 add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
 add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
                       glog
                       quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..16700c0
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getOutputAttributes() const {
+  std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    output_attributes.emplace_back(E::ToRef(aggregate_expr));
+  }
+  return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getReferencedAttributes() const {
+  std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+  referenced_attributes.insert(left_join_attributes_.begin(),
+                               left_join_attributes_.end());
+  referenced_attributes.insert(right_join_attributes_.begin(),
+                               right_join_attributes_.end());
+
+  if (right_filter_predicate_ != nullptr) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+        right_filter_predicate_->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_predicate.begin(),
+                                 attrs_in_predicate.end());
+  }
+
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+        aggregate_expr->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+  }
+
+  return std::vector<E::AttributeReferencePtr>(
+      referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("hash_table_num_entries");
+  inline_field_values->push_back(std::to_string(hash_table_num_entries_));
+
+  non_container_child_field_names->push_back("left_child");
+  non_container_child_fields->push_back(left_child_);
+  non_container_child_field_names->push_back("right_child");
+  non_container_child_fields->push_back(right_child_);
+
+  container_child_field_names->push_back("left_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+  container_child_field_names->push_back("right_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+  if (right_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("right_filter_predicate");
+    non_container_child_fields->push_back(right_filter_predicate_);
+  }
+  container_child_field_names->push_back("aggregate_expressions");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..9fb02e2
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief TODO.
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCrossReferenceCoalesceAggregate;
+  }
+
+  std::string getName() const override {
+    return "CrossReferenceCoalesceAggregate";
+  }
+
+  const PhysicalPtr& left_child() const {
+    return left_child_;
+  }
+
+  const PhysicalPtr& right_child() const {
+    return right_child_;
+  }
+
+  const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+    return left_join_attributes_;
+  }
+
+  const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+    return right_join_attributes_;
+  }
+
+  const expressions::PredicatePtr& right_filter_predicate() const {
+    return right_filter_predicate_;
+  }
+
+  const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+    return aggregate_expressions_;
+  }
+
+  inline std::size_t hash_table_num_entries() const {
+    return hash_table_num_entries_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(getNumChildren(), new_children.size());
+    return Create(new_children[0],
+                  new_children[1],
+                  left_join_attributes_,
+                  right_join_attributes_,
+                  right_filter_predicate_,
+                  aggregate_expressions_,
+                  hash_table_num_entries_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  static CrossReferenceCoalesceAggregatePtr Create(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t hash_table_num_entries) {
+    return CrossReferenceCoalesceAggregatePtr(
+        new CrossReferenceCoalesceAggregate(left_child,
+                                            right_child,
+                                            left_join_attributes,
+                                            right_join_attributes,
+                                            right_filter_predicate,
+                                            aggregate_expressions,
+                                            hash_table_num_entries));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CrossReferenceCoalesceAggregate(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t hash_table_num_entries)
+      : left_child_(left_child),
+        right_child_(right_child),
+        left_join_attributes_(left_join_attributes),
+        right_join_attributes_(right_join_attributes),
+        right_filter_predicate_(right_filter_predicate),
+        aggregate_expressions_(aggregate_expressions),
+        hash_table_num_entries_(hash_table_num_entries) {
+    addChild(left_child_);
+    addChild(right_child_);
+  }
+
+  PhysicalPtr left_child_;
+  PhysicalPtr right_child_;
+  std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+  std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+  expressions::PredicatePtr right_filter_predicate_;
+  std::vector<expressions::AliasPtr> aggregate_expressions_;
+  std::size_t hash_table_num_entries_;
+
+  DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
 class BinaryJoin;
 class CopyFrom;
 class CreateTable;
+class CrossReferenceCoalesceAggregate;
 class DeleteTuples;
 class DropTable;
 class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
 using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
 using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
 using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+                                                             PhysicalType::kCrossReferenceCoalesceAggregate>;
 using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
 using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
 using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
   kCopyFrom,
   kCreateIndex,
   kCreateTable,
+  kCrossReferenceCoalesceAggregate,
   kDeleteTuples,
   kDropTable,
   kFilterJoin,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
     DCHECK(tree != nullptr);
 
     init(tree);
-    std::vector<std::shared_ptr<const TreeType>> new_children;
-    bool has_changed_children = false;
-    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
-      std::shared_ptr<const TreeType> new_child = apply(child);
-      if (child != new_child && !has_changed_children) {
-        has_changed_children = true;
-      }
-      new_children.push_back(new_child);
-    }
-
-    if (has_changed_children) {
-      return applyToNode(tree->copyWithNewChildren(new_children));
-    } else {
-      return applyToNode(tree);
-    }
+    return applyInternal(tree);
   }
 
  protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
    *
    * @param input The input tree.
    */
-  virtual void init(const TreeNodePtr &input) {
-  }
+  virtual void init(const TreeNodePtr &input) {}
 
  private:
+  TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+    DCHECK(tree != nullptr);
+
+    std::vector<std::shared_ptr<const TreeType>> new_children;
+    bool has_changed_children = false;
+    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+      std::shared_ptr<const TreeType> new_child = applyInternal(child);
+      if (child != new_child && !has_changed_children) {
+        has_changed_children = true;
+      }
+      new_children.push_back(new_child);
+    }
+
+    if (has_changed_children) {
+      return applyToNode(tree->copyWithNewChildren(new_children));
+    } else {
+      return applyToNode(tree);
+    }
+  }
+
   DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..a614bde 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..d65ee27
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <unordered_set>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+    const P::PhysicalPtr &node) {
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+      aggregate->filter_predicate() != nullptr) {
+    return node;
+  }
+
+  P::HashJoinPtr hash_join;
+  if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+      hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+      hash_join->residual_predicate() != nullptr) {
+    return node;
+  }
+
+  const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+      hash_join->left_join_attributes();
+  if (left_join_attributes.size() != 1u ||
+      (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+    return node;
+  }
+
+  const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+      aggregate->grouping_expressions();
+  if (grouping_expressions.size() != 1u ||
+      grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+    return node;
+  }
+
+  std::unordered_set<E::ExprId> right_side_attr_ids;
+  for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+    right_side_attr_ids.insert(attr->id());
+  }
+
+  const std::vector<E::AliasPtr> &aggregate_expressions =
+      aggregate->aggregate_expressions();
+  for (const auto &expr : aggregate_expressions) {
+    const E::AggregateFunctionPtr aggr_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+    const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+    if (arguments.size() != 1u) {
+      return node;
+    }
+
+    E::AttributeReferencePtr arg_attr;
+    if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+        right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+      return node;
+    }
+  }
+
+  std::size_t max_num_groups;
+  // TODO: not actually aggregate here
+  if (!canUseCollisionFreeAggregation(aggregate,
+                                      cost_model_->estimateNumGroupsForAggregate(aggregate),
+                                      &max_num_groups)) {
+    return node;
+  }
+
+  // Fuse filter predicate.
+  P::PhysicalPtr right_child = hash_join->right();
+  const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+      hash_join->right_join_attributes();
+  E::PredicatePtr right_filter_predicate = nullptr;
+
+  P::SelectionPtr selection;
+  if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+    if (E::SubsetOfExpressions(right_join_attributes,
+                               selection->input()->getOutputAttributes())) {
+      right_child = selection->input();
+      right_filter_predicate = selection->filter_predicate();
+    }
+  }
+
+  return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+                                                    right_child,
+                                                    left_join_attributes,
+                                                    right_join_attributes,
+                                                    right_filter_predicate,
+                                                    aggregate_expressions,
+                                                    max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+      std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+bool FuseAggregateJoin::canUseCollisionFreeAggregation(
+    const P::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups,
+    std::size_t *max_num_groups) const {
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Currently we cannot do this fast path with the distributed setting. See
+  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+  // FinalizeAggregationOperator::getAllWorkOrderProtos().
+  return false;
+#endif
+
+  // Supports only single group-by key.
+  if (aggregate->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  // We need to know the exact min/max stats of the group-by key.
+  // So it must be a CatalogAttribute (but not an expression).
+  E::AttributeReferencePtr group_by_key_attr;
+  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  bool min_value_stat_is_exact;
+  bool max_value_stat_is_exact;
+  const TypedValue min_value =
+      cost_model_->findMinValueStat(
+          aggregate, group_by_key_attr, &min_value_stat_is_exact);
+  const TypedValue max_value =
+      cost_model_->findMaxValueStat(
+          aggregate, group_by_key_attr, &max_value_stat_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+    return false;
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  switch (group_by_key_attr->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      min_cpp_value = min_value.getLiteral<int>();
+      max_cpp_value = max_value.getLiteral<int>();
+      break;
+    }
+    case TypeID::kLong: {
+      min_cpp_value = min_value.getLiteral<std::int64_t>();
+      max_cpp_value = max_value.getLiteral<std::int64_t>();
+      break;
+    }
+    default:
+      return false;
+  }
+
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the table size bound (e.g. by checking memory size) instead
+  //    of hardcoding it as a gflag.
+  if (min_cpp_value < 0 ||
+      max_cpp_value >= 1000000000 ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
+    // TODO(jianqiao): Support AggregationID::AVG.
+    switch (agg_func->getAggregate().getAggregationID()) {
+      case AggregationID::kCount:  // Fall through
+      case AggregationID::kSum:
+        break;
+      default:
+        return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1) {
+      return false;
+    }
+
+    if (arguments.size() == 1) {
+      switch (arguments.front()->getValueType().getTypeID()) {
+        case TypeID::kInt:  // Fall through
+        case TypeID::kLong:
+        case TypeID::kFloat:
+        case TypeID::kDouble:
+          break;
+        default:
+          return false;
+      }
+    }
+  }
+
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..9e2ec04
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  FuseAggregateJoin() {}
+
+  ~FuseAggregateJoin() override {}
+
+  std::string getName() const override {
+    return "FuseAggregateOnLeftOuterJoin";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+  void init(const physical::PhysicalPtr &input) override;
+
+ private:
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *max_num_groups) const;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..6a77696
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+                  ValueAccessor *accessor,
+                  BarrieredReadWriteConcurrentBitVector *existence_map) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+      }
+    }
+  });
+}
+
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+                   const bool is_attr_nullable,
+                   ValueAccessor *accessor,
+                   BarrieredReadWriteConcurrentBitVector *existence_map)  {
+  if (is_attr_nullable) {
+    ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+  } else {
+    ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+  }
+}
+
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_block_id,
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_relation_block_ids_[num_workorders_generated_],
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  LOG(FATAL) << "Not implemented";
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  const Type &attr_type =
+      input_relation_.getAttributeById(build_attribute_)->getType();
+  switch (attr_type.getTypeID()) {
+    case TypeID::kInt:
+      ExecuteHelper<int>(build_attribute_,
+                         attr_type.isNullable(),
+                         accessor.get(),
+                         state_->getExistenceMap());
+      return;
+    case TypeID::kLong:
+      ExecuteHelper<std::int64_t>(build_attribute_,
+                                  attr_type.isNullable(),
+                                  accessor.get(),
+                                  state_->getExistenceMap());
+      return;
+    default:
+      LOG(FATAL) << "Build attribute type not supported by "
+                 << "BuildAggregationExistenceMapOperator: "
+                 << attr_type.getName();
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/BuildAggregationExistenceMapOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
new file mode 100644
index 0000000..2d13bda
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class CatalogRelationSchema;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+class BuildAggregationExistenceMapOperator : public RelationalOperator {
+ public:
+  BuildAggregationExistenceMapOperator(const std::size_t query_id,
+                                       const CatalogRelation &input_relation,
+                                       const attribute_id build_attribute,
+                                       const bool input_relation_is_stored,
+                                       const QueryContext::aggregation_state_id aggr_state_index)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
+        build_attribute_(build_attribute),
+        input_relation_is_stored_(input_relation_is_stored),
+        input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+                                                           : std::vector<block_id>()),
+        aggr_state_index_(aggr_state_index),
+        num_workorders_generated_(0),
+        started_(false) {}
+
+  ~BuildAggregationExistenceMapOperator() override {}
+
+  std::string getName() const override {
+    return "BuildAggregationExistenceMapOperator";
+  }
+
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    input_relation_block_ids_.push_back(input_block_id);
+  }
+
+ private:
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+  const CatalogRelation &input_relation_;
+  const attribute_id build_attribute_;
+  const bool input_relation_is_stored_;
+  std::vector<block_id> input_relation_block_ids_;
+  const QueryContext::aggregation_state_id aggr_state_index_;
+
+  std::vector<block_id>::size_type num_workorders_generated_;
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator.
+ **/
+class BuildAggregationExistenceMapWorkOrder : public WorkOrder {
+ public:
+  BuildAggregationExistenceMapWorkOrder(const std::size_t query_id,
+                                        const CatalogRelationSchema &input_relation,
+                                        const block_id build_block_id,
+                                        const attribute_id build_attribute,
+                                        AggregationOperationState *state,
+                                        StorageManager *storage_manager)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        build_block_id_(build_block_id),
+        build_attribute_(build_attribute),
+        state_(DCHECK_NOTNULL(state)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
+  ~BuildAggregationExistenceMapWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+  const block_id build_block_id_;
+  const attribute_id build_attribute_;
+  AggregationOperationState *state_;
+
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..33321d3 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -33,6 +33,9 @@ set_gflags_lib_name ()
 
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
+add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+            BuildAggregationExistenceMapOperator.cpp
+            BuildAggregationExistenceMapOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
 add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
@@ -95,6 +98,27 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_AggregationOperationState
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -552,6 +576,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0b34908..2d85312 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -412,12 +412,23 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
   }
 }
 
+BarrieredReadWriteConcurrentBitVector* AggregationOperationState
+    ::getExistenceMap() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeVectorTable *>(
+        collision_free_hashtable_.get())->getExistenceMap();
+  } else {
+    LOG(FATAL) << "AggregationOperationState::getExistenceMap() "
+               << "is not supported by this aggregation";
+  }
+}
+
 void AggregationOperationState::initialize(const std::size_t partition_id) {
   if (is_aggregate_collision_free_) {
     static_cast<CollisionFreeVectorTable *>(
         collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    LOG(FATAL) << "AggregationOperationState::initializeState() "
+    LOG(FATAL) << "AggregationOperationState::initialize() "
                << "is not supported by this aggregation";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 13ee377..23f23b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -39,6 +39,7 @@ namespace quickstep {
 namespace serialization { class AggregationOperationState; }
 
 class AggregateFunction;
+class BarrieredReadWriteConcurrentBitVector;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
 class InsertDestination;
@@ -167,6 +168,8 @@ class AggregationOperationState {
    **/
   std::size_t getNumFinalizationPartitions() const;
 
+  BarrieredReadWriteConcurrentBitVector* getExistenceMap() const;
+
   /**
    * @brief Initialize the specified partition of this aggregation.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 4f3e238..cd76854 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -104,6 +104,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
     return existence_map_->onesCountInRange(start_position, end_position);
   }
 
+  inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const {
+    return existence_map_.get();
+  }
+
   /**
    * @brief Initialize the specified partition of this aggregation table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 6ad0567..7c4c238 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -20,9 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
 
-#include <atomic>
+#include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -31,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -64,14 +64,10 @@ class BitVectorExactFilter : public LIPFilter {
       : LIPFilter(LIPFilterType::kBitVectorExactFilter),
         min_value_(static_cast<CppType>(min_value)),
         max_value_(static_cast<CppType>(max_value)),
-        bit_array_(GetByteSize(max_value - min_value + 1)) {
+        bit_vector_(max_value - min_value + 1) {
     DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
     DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
     DCHECK_GE(max_value_, min_value_);
-
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -109,13 +105,6 @@ class BitVectorExactFilter : public LIPFilter {
 
  private:
   /**
-   * @brief Round up bit_size to multiples of 8.
-   */
-  inline static std::size_t GetByteSize(const std::size_t bit_size) {
-    return (bit_size + 7u) / 8u;
-  }
-
-  /**
    * @brief Iterate through the accessor and hash values into the internal bit
    *        array.
    */
@@ -164,8 +153,7 @@ class BitVectorExactFilter : public LIPFilter {
     DCHECK_GE(value, min_value_);
     DCHECK_LE(value, max_value_);
 
-    const CppType loc = value - min_value_;
-    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+    bit_vector_.setBit(value - min_value_);
   }
 
   /**
@@ -177,9 +165,7 @@ class BitVectorExactFilter : public LIPFilter {
       return is_anti_filter;
     }
 
-    const CppType loc = value - min_value_;
-    const bool is_bit_set =
-        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+    const bool is_bit_set = bit_vector_.getBit(value - min_value_);
 
     if (is_anti_filter) {
       return !is_bit_set;
@@ -190,7 +176,7 @@ class BitVectorExactFilter : public LIPFilter {
 
   const CppType min_value_;
   const CppType max_value_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index edd0d24..0a820cf 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -39,8 +39,9 @@ target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -83,5 +84,6 @@ target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
-                      quickstep_utility_lipfilter_LIPFilter
-                      quickstep_utility_Macros)
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1db1e088/utility/lip_filter/SingleIdentityHashFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp
index 5c0e8a2..6eaa93e 100644
--- a/utility/lip_filter/SingleIdentityHashFilter.hpp
+++ b/utility/lip_filter/SingleIdentityHashFilter.hpp
@@ -20,10 +20,8 @@
 #ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 #define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_
 
-#include <atomic>
 #include <cstddef>
 #include <cstdint>
-#include <cstring>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -32,6 +30,7 @@
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/Type.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
 
@@ -65,11 +64,8 @@ class SingleIdentityHashFilter : public LIPFilter {
   explicit SingleIdentityHashFilter(const std::size_t filter_cardinality)
       : LIPFilter(LIPFilterType::kSingleIdentityHashFilter),
         filter_cardinality_(filter_cardinality),
-        bit_array_(GetByteSize(filter_cardinality)) {
+        bit_vector_(filter_cardinality) {
     DCHECK_GE(filter_cardinality, 1u);
-    std::memset(bit_array_.data(),
-                0x0,
-                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
   }
 
   void insertValueAccessor(ValueAccessor *accessor,
@@ -158,8 +154,9 @@ class SingleIdentityHashFilter : public LIPFilter {
    * @brief Inserts a given value into the hash filter.
    */
   inline void insert(const void *key_begin) {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed);
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    bit_vector_.setBit(hash);
   }
 
   /**
@@ -168,12 +165,13 @@ class SingleIdentityHashFilter : public LIPFilter {
    *        If false is returned, a value is certainly not present in the hash filter.
    */
   inline bool contains(const void *key_begin) const {
-    const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
-    return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u)));
+    const CppType hash =
+        *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_;
+    return bit_vector_.getBit(hash);
   }
 
   std::size_t filter_cardinality_;
-  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+  BarrieredReadWriteConcurrentBitVector bit_vector_;
 
   DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter);
 };



Mime
View raw message