quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [40/50] [abbrv] incubator-quickstep git commit: Fuse Aggregate with LeftOuterJoin to accelerate evaluation.
Date Tue, 21 Feb 2017 03:38:23 GMT
Fuse Aggregate with LeftOuterJoin to accelerate evaluation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a28b1e4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a28b1e4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a28b1e4d

Branch: refs/heads/LIP-time-decomposition
Commit: a28b1e4d77ee12466b0801a5a7c5185f7a83e7f8
Parents: 266b9b9
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Mon Jan 30 14:46:39 2017 -0600
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Wed Feb 8 23:55:32 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   6 +-
 query_optimizer/ExecutionGenerator.cpp          | 261 +++++++++++--------
 query_optimizer/ExecutionGenerator.hpp          |  20 +-
 query_optimizer/PhysicalGenerator.cpp           |   3 +
 query_optimizer/cost_model/CMakeLists.txt       |   8 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    | 148 ++++++++++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  20 ++
 query_optimizer/physical/CMakeLists.txt         |  14 +
 .../CrossReferenceCoalesceAggregate.cpp         | 105 ++++++++
 .../CrossReferenceCoalesceAggregate.hpp         | 232 +++++++++++++++++
 query_optimizer/physical/PatternMatcher.hpp     |   3 +
 query_optimizer/physical/PhysicalType.hpp       |   1 +
 query_optimizer/rules/BottomUpRule.hpp          |  39 +--
 query_optimizer/rules/CMakeLists.txt            |  23 ++
 query_optimizer/rules/FuseAggregateJoin.cpp     | 170 ++++++++++++
 query_optimizer/rules/FuseAggregateJoin.hpp     |  71 +++++
 .../BuildAggregationExistenceMapOperator.cpp    | 196 ++++++++++++++
 .../BuildAggregationExistenceMapOperator.hpp    | 177 +++++++++++++
 relational_operators/CMakeLists.txt             |  30 +++
 relational_operators/WorkOrder.proto            |  12 +-
 relational_operators/WorkOrderFactory.cpp       |  37 +++
 storage/AggregationOperationState.cpp           |   8 +-
 storage/AggregationOperationState.hpp           |   9 +
 storage/CollisionFreeVectorTable.hpp            |   9 +
 utility/lip_filter/BitVectorExactFilter.hpp     |  27 +-
 utility/lip_filter/CMakeLists.txt               |  12 +-
 utility/lip_filter/SingleIdentityHashFilter.hpp |  22 +-
 29 files changed, 1489 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index e750a1e..3ff783c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,7 +64,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
-                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
@@ -95,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin
@@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_UpdateTable
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildAggregationExistenceMapOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_CreateIndexOperator
@@ -147,12 +148,10 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
-                      quickstep_types_TypeID
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto
                       quickstep_types_containers_Tuple_proto
-                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros
                       quickstep_utility_SqlError)
 if (ENABLE_DISTRIBUTED)
@@ -213,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 1b50caa..70b69e0 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,7 +49,6 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
-#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -72,9 +71,11 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
 #include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -96,6 +97,7 @@
 #include "query_optimizer/physical/UpdateTable.hpp"
 #include "query_optimizer/physical/WindowAggregate.hpp"
 #include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
 #include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/CreateIndexOperator.hpp"
@@ -128,11 +130,9 @@
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
-#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
-#include "utility/EqualsAnyConstant.hpp"
 #include "utility/SqlError.hpp"
 
 #include "gflags/gflags.h"
@@ -163,10 +163,6 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
-DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
-              "The maximum allowed key range (number of entries) for using a "
-              "CollisionFreeVectorTable.");
-
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -266,6 +262,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kAggregate:
       return convertAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return convertCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kCopyFrom:
       return convertCopyFrom(
           std::static_pointer_cast<const P::CopyFrom>(physical_plan));
@@ -379,105 +378,6 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
   }
 }
 
-bool ExecutionGenerator::canUseCollisionFreeAggregation(
-    const P::AggregatePtr &aggregate,
-    const std::size_t estimated_num_groups,
-    std::size_t *max_num_groups) const {
-#ifdef QUICKSTEP_DISTRIBUTED
-  // Currently we cannot do this fast path with the distributed setting. See
-  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
-  // FinalizeAggregationOperator::getAllWorkOrderProtos().
-  return false;
-#endif
-
-  // Supports only single group-by key.
-  if (aggregate->grouping_expressions().size() != 1) {
-    return false;
-  }
-
-  // We need to know the exact min/max stats of the group-by key.
-  // So it must be a CatalogAttribute (but not an expression).
-  E::AttributeReferencePtr group_by_key_attr;
-  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
-  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
-    return false;
-  }
-
-  bool min_value_stat_is_exact;
-  bool max_value_stat_is_exact;
-  const TypedValue min_value =
-      cost_model_for_aggregation_->findMinValueStat(
-          aggregate, group_by_key_attr, &min_value_stat_is_exact);
-  const TypedValue max_value =
-      cost_model_for_aggregation_->findMaxValueStat(
-          aggregate, group_by_key_attr, &max_value_stat_is_exact);
-  if (min_value.isNull() || max_value.isNull() ||
-      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
-    return false;
-  }
-
-  std::int64_t min_cpp_value;
-  std::int64_t max_cpp_value;
-  switch (group_by_key_attr->getValueType().getTypeID()) {
-    case TypeID::kInt: {
-      min_cpp_value = min_value.getLiteral<int>();
-      max_cpp_value = max_value.getLiteral<int>();
-      break;
-    }
-    case TypeID::kLong: {
-      min_cpp_value = min_value.getLiteral<std::int64_t>();
-      max_cpp_value = max_value.getLiteral<std::int64_t>();
-      break;
-    }
-    default:
-      return false;
-  }
-
-  // TODO(jianqiao):
-  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
-  // 2. Reason about the table size bound (e.g. by checking memory size) instead
-  //    of hardcoding it as a gflag.
-  if (min_cpp_value < 0 ||
-      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
-      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
-    return false;
-  }
-
-  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
-    const E::AggregateFunctionPtr agg_func =
-        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
-
-    if (agg_func->is_distinct()) {
-      return false;
-    }
-
-    // TODO(jianqiao): Support AggregationID::AVG.
-    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
-                                       AggregationID::kCount,
-                                       AggregationID::kSum)) {
-      return false;
-    }
-
-    const auto &arguments = agg_func->getArguments();
-    if (arguments.size() > 1u) {
-      return false;
-    }
-
-    if (arguments.size() == 1u) {
-      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
-                                         TypeID::kInt,
-                                         TypeID::kLong,
-                                         TypeID::kFloat,
-                                         TypeID::kDouble)) {
-        return false;
-      }
-    }
-  }
-
-  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
-  return true;
-}
-
 void ExecutionGenerator::convertNamedExpressions(
     const std::vector<E::NamedExpressionPtr> &named_expressions,
     S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1608,9 +1508,10 @@ void ExecutionGenerator::convertAggregate(
         cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
 
     std::size_t max_num_groups;
-    if (canUseCollisionFreeAggregation(physical_plan,
-                                       estimated_num_groups,
-                                       &max_num_groups)) {
+    if (cost_model_for_aggregation_
+            ->canUseCollisionFreeAggregation(physical_plan,
+                                             estimated_num_groups,
+                                             &max_num_groups)) {
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
@@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate(
   }
 }
 
+void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  DCHECK_EQ(1u, physical_plan->left_join_attributes().size());
+  DCHECK_EQ(1u, physical_plan->right_join_attributes().size());
+
+  const CatalogRelationInfo *left_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->left_child());
+  const CatalogRelationInfo *right_relation_info =
+      findRelationInfoOutputByPhysical(physical_plan->right_child());
+
+  // Create aggr state proto.
+  const QueryContext::aggregation_state_id aggr_state_index =
+      query_context_proto_->aggregation_states_size();
+  S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
+
+  aggr_state_proto->set_relation_id(right_relation_info->relation->getID());
+
+  // Group by the right join attribute.
+  std::unique_ptr<const Scalar> execution_group_by_expression(
+      physical_plan->right_join_attributes().front()->concretize(
+          attribute_substitution_map_));
+  aggr_state_proto->add_group_by_expressions()->CopyFrom(
+      execution_group_by_expression->getProto());
+
+  aggr_state_proto->set_hash_table_impl_type(
+      serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+  aggr_state_proto->set_estimated_num_entries(
+      physical_plan->group_by_key_value_range());
+
+  if (physical_plan->right_filter_predicate() != nullptr) {
+    std::unique_ptr<const Predicate> predicate(
+        convertPredicate(physical_plan->right_filter_predicate()));
+    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+  }
+
+  for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
+    const E::AggregateFunctionPtr unnamed_aggregate_expression =
+        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+    // Add a new entry in 'aggregates'.
+    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+    // Set the AggregateFunction.
+    aggr_proto->mutable_function()->CopyFrom(
+        unnamed_aggregate_expression->getAggregate().getProto());
+
+    // Add each of the aggregate's arguments.
+    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+    }
+
+    // Set whether it is a DISTINCT aggregation.
+    DCHECK(!unnamed_aggregate_expression->is_distinct());
+    aggr_proto->set_is_distinct(false);
+  }
+
+  const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InitializeAggregationOperator(
+              query_handle_->query_id(),
+              aggr_state_index));
+
+  const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
+      execution_plan_->addRelationalOperator(
+          new BuildAggregationExistenceMapOperator(
+              query_handle_->query_id(),
+              *left_relation_info->relation,
+              physical_plan->left_join_attributes().front()->id(),
+              left_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!left_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                         left_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  const QueryPlan::DAGNodeIndex aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new AggregationOperator(
+              query_handle_->query_id(),
+              *right_relation_info->relation,
+              right_relation_info->isStoredRelation(),
+              aggr_state_index));
+
+  if (!right_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(aggregation_operator_index,
+                                         right_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  // Build aggregation existence map once initialization is done.
+  execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index,
+                                       initialize_aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  // Start aggregation after building existence map.
+  execution_plan_->addDirectDependency(aggregation_operator_index,
+                                       build_aggregation_existence_map_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+
+  // Create InsertDestination proto.
+  const CatalogRelation *output_relation = nullptr;
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  createTemporaryCatalogRelation(physical_plan,
+                                 &output_relation,
+                                 insert_destination_proto);
+
+  const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
+      execution_plan_->addRelationalOperator(
+          new FinalizeAggregationOperator(query_handle_->query_id(),
+                                          aggr_state_index,
+                                          *output_relation,
+                                          insert_destination_index));
+
+  insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
+
+  execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
+                                       aggregation_operator_index,
+                                       true /* is_pipeline_breaker */);
+
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+  temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
+                                            output_relation);
+
+  const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
+      execution_plan_->addRelationalOperator(
+          new DestroyAggregationStateOperator(query_handle_->query_id(),
+                                              aggr_state_index));
+
+  execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index,
+                                       finalize_aggregation_operator_index,
+                                       true);
+}
+
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
   // Create sort configuration for run generation.
   vector<bool> sort_ordering(physical_sort->sort_ascending());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 987f11a..f4e614a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
 #include "query_optimizer/physical/CopyFrom.hpp"
 #include "query_optimizer/physical/CreateIndex.hpp"
 #include "query_optimizer/physical/CreateTable.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
@@ -206,22 +207,6 @@ class ExecutionGenerator {
   std::string getNewRelationName();
 
   /**
-   * @brief Checks whether an aggregate node can be efficiently evaluated with
-   *        the collision-free aggregation fast path.
-   *
-   * @param aggregate The physical aggregate node to be checked.
-   * @param estimated_num_groups The estimated number of groups for the aggregate.
-   * @param exact_num_groups If collision-free aggregation is applicable, the
-   *        pointed content of this pointer will be set as the maximum possible
-   *        number of groups that the collision-free hash table need to hold.
-   * @return A bool value indicating whether collision-free aggregation can be
-   *         used to evaluate \p aggregate.
-   */
-  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
-                                      const std::size_t estimated_num_groups,
-                                      std::size_t *max_num_groups) const;
-
-  /**
    * @brief Sets up the info of the CatalogRelation represented by TableReference.
    *        TableReference is not converted to any operator.
    *
@@ -356,6 +341,9 @@ class ExecutionGenerator {
    */
   void convertAggregate(const physical::AggregatePtr &physical_plan);
 
+  void convertCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   /**
    * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 1b68f49..ac51c31 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
@@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
+  rules.emplace_back(new FuseAggregateJoin());
+
   // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
   // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
   // suggested that all the new rules be placed before this point.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 5f28bb3..4042915 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -51,7 +52,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_queryoptimizer_costmodel_CostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ComparisonExpression
                       quickstep_queryoptimizer_expressions_ExprId
@@ -62,6 +66,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
@@ -76,7 +81,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_types_NullType
+                      quickstep_types_Type
+                      quickstep_types_TypeID
                       quickstep_types_TypedValue
+                      quickstep_utility_EqualsAnyConstant
                       quickstep_utility_Macros)
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index e9d2e3a..cfd8a75 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -26,6 +26,7 @@
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate(
                   estimateCardinality(physical_plan->input()) / 10);
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate(
     const physical::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 4edc2fe..0660c37 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -25,6 +25,7 @@
 
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  // Returns the cardinality of the left child plan.
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   // Return the estimated cardinality of the input plan.
   std::size_t estimateCardinalityForWindowAggregate(
       const physical::WindowAggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 7afa1c3..fc775c7 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -20,13 +20,18 @@
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 
 #include <algorithm>
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
@@ -37,6 +42,7 @@
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -49,8 +55,13 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
 #include "types/NullType.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "gflags/gflags.h"
 
 #include "glog/logging.h"
 
@@ -58,6 +69,10 @@ namespace quickstep {
 namespace optimizer {
 namespace cost {
 
+DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
+              "The maximum allowed key range (number of entries) for using a "
+              "CollisionFreeVectorTable.");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 
@@ -88,6 +103,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kAggregate:
       return estimateCardinalityForAggregate(
           std::static_pointer_cast<const P::Aggregate>(physical_plan));
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate:
+      return estimateCardinalityForCrossReferenceCoalesceAggregate(
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan));
     case P::PhysicalType::kSharedSubplanReference: {
       const P::SharedSubplanReferencePtr shared_subplan_reference =
           std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan);
@@ -175,6 +193,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate(
       estimateNumGroupsForAggregate(physical_plan) * filter_selectivity);
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate(
+    const P::CrossReferenceCoalesceAggregatePtr &physical_plan) {
+  return estimateCardinality(physical_plan->left_child());
+}
+
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate(
     const P::WindowAggregatePtr &physical_plan) {
   return estimateCardinality(physical_plan->input());
@@ -233,6 +256,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
       }
       break;
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::PhysicalPtr left_child = physical_plan->children()[0];
+      if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) {
+        return estimateNumDistinctValues(attribute_id, left_child);
+      }
+      break;
+    }
     case P::PhysicalType::kFilterJoin: {
       const P::FilterJoinPtr &filter_join =
           std::static_pointer_cast<const P::FilterJoin>(physical_plan);
@@ -275,6 +305,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
 double StarSchemaSimpleCostModel::estimateSelectivity(
     const physical::PhysicalPtr &physical_plan) {
   switch (physical_plan->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(physical_plan);
+      return estimateSelectivity(aggregate->input()) *
+          estimateSelectivityForFilterPredicate(aggregate);
+    }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return estimateSelectivity(aggregate_on_left_outer_join->left_child());
+    }
     case P::PhysicalType::kSelection: {
       const P::SelectionPtr &selection =
           std::static_pointer_cast<const P::Selection>(physical_plan);
@@ -331,6 +372,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
 
 double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     const physical::PhysicalPtr &physical_plan) {
+  P::PhysicalPtr target_plan = physical_plan;
   E::PredicatePtr filter_predicate = nullptr;
   switch (physical_plan->getPhysicalType()) {
     case P::PhysicalType::kSelection:
@@ -340,6 +382,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
     case P::PhysicalType::kAggregate:
       filter_predicate =
           std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate();
+      target_plan = physical_plan->children()[0];
       break;
     case P::PhysicalType::kHashJoin:
       filter_predicate =
@@ -356,7 +399,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate(
   if (filter_predicate == nullptr) {
     return 1.0;
   } else {
-    return estimateSelectivityForPredicate(filter_predicate, physical_plan);
+    return estimateSelectivityForPredicate(filter_predicate, target_plan);
   }
 }
 
@@ -443,6 +486,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
           std::static_pointer_cast<const P::Aggregate>(physical_plan);
       return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
     }
+    case P::PhysicalType::kCrossReferenceCoalesceAggregate: {
+      const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join =
+          std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan);
+      return E::SubsetOfExpressions(
+          aggregate_on_left_outer_join->left_join_attributes(), attributes);
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -542,6 +591,103 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
   return kInvalidAttributeID;
 }
 
+bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
+    const P::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups,
+    std::size_t *max_num_groups) {
+#ifdef QUICKSTEP_DISTRIBUTED
+  // Currently we cannot do this fast path with the distributed setting. See
+  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+  // FinalizeAggregationOperator::getAllWorkOrderProtos().
+  return false;
+#endif
+
+  // Supports only single group-by key.
+  if (aggregate->grouping_expressions().size() != 1) {
+    return false;
+  }
+
+  // We need to know the exact min/max stats of the group-by key.
+  // So it must be a CatalogAttribute (but not an expression).
+  E::AttributeReferencePtr group_by_key_attr;
+  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+    return false;
+  }
+
+  bool min_value_stat_is_exact;
+  bool max_value_stat_is_exact;
+  const TypedValue min_value = findMinValueStat(
+          aggregate, group_by_key_attr, &min_value_stat_is_exact);
+  const TypedValue max_value = findMaxValueStat(
+          aggregate, group_by_key_attr, &max_value_stat_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+    return false;
+  }
+
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  switch (group_by_key_attr->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      min_cpp_value = min_value.getLiteral<int>();
+      max_cpp_value = max_value.getLiteral<int>();
+      break;
+    }
+    case TypeID::kLong: {
+      min_cpp_value = min_value.getLiteral<std::int64_t>();
+      max_cpp_value = max_value.getLiteral<std::int64_t>();
+      break;
+    }
+    default:
+      return false;
+  }
+
+  // TODO(jianqiao):
+  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+  // 2. Reason about the table size bound (e.g. by checking memory size) instead
+  //    of hardcoding it as a gflag.
+  if (min_cpp_value < 0 ||
+      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
+      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+    return false;
+  }
+
+  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_func =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
+    // TODO(jianqiao): Support AggregationID::AVG.
+    if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(),
+                                       AggregationID::kCount,
+                                       AggregationID::kSum)) {
+      return false;
+    }
+
+    const auto &arguments = agg_func->getArguments();
+    if (arguments.size() > 1u) {
+      return false;
+    }
+
+    if (arguments.size() == 1u) {
+      if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+                                         TypeID::kInt,
+                                         TypeID::kLong,
+                                         TypeID::kFloat,
+                                         TypeID::kDouble)) {
+        return false;
+      }
+    }
+  }
+
+  *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index cbe18f4..afb2ef9 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
@@ -166,10 +167,29 @@ class StarSchemaSimpleCostModel : public CostModel {
         physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
   }
 
+  /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the collision-free aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the aggregate.
+   * @param exact_num_groups If collision-free aggregation is applicable, the
+   *        pointed content of this pointer will be set as the maximum possible
+   *        number of groups that the collision-free hash table need to hold.
+   * @return A bool value indicating whether collision-free aggregation can be
+   *         used to evaluate \p aggregate.
+   */
+  bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+                                      const std::size_t estimated_num_groups,
+                                      std::size_t *max_num_groups);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate(
+      const physical::CrossReferenceCoalesceAggregatePtr &physical_plan);
+
   std::size_t estimateCardinalityForFilterJoin(
       const physical::FilterJoinPtr &physical_plan);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index f68ed39..77ae75e 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo
 add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp)
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
+add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+            CrossReferenceCoalesceAggregate.cpp
+            CrossReferenceCoalesceAggregate.hpp)
 add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
 add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
 add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
@@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples
                       glog
                       quickstep_catalog_CatalogRelation
@@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_CopyFrom
                       quickstep_queryoptimizer_physical_CreateIndex
                       quickstep_queryoptimizer_physical_CreateTable
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_FilterJoin

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
new file mode 100644
index 0000000..6bed215
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getOutputAttributes() const {
+  std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_);
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    output_attributes.emplace_back(E::ToRef(aggregate_expr));
+  }
+  return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate
+    ::getReferencedAttributes() const {
+  std::unordered_set<E::AttributeReferencePtr> referenced_attributes;
+
+  referenced_attributes.insert(left_join_attributes_.begin(),
+                               left_join_attributes_.end());
+  referenced_attributes.insert(right_join_attributes_.begin(),
+                               right_join_attributes_.end());
+
+  if (right_filter_predicate_ != nullptr) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_predicate =
+        right_filter_predicate_->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_predicate.begin(),
+                                 attrs_in_predicate.end());
+  }
+
+  for (const auto &aggregate_expr : aggregate_expressions_) {
+    const std::vector<E::AttributeReferencePtr> attrs_in_expr =
+        aggregate_expr->getReferencedAttributes();
+    referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end());
+  }
+
+  return std::vector<E::AttributeReferencePtr>(
+      referenced_attributes.begin(), referenced_attributes.end());
+}
+
+void CrossReferenceCoalesceAggregate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("group_by_key_value_range");
+  inline_field_values->push_back(std::to_string(group_by_key_value_range_));
+
+  non_container_child_field_names->push_back("left_child");
+  non_container_child_fields->push_back(left_child_);
+  non_container_child_field_names->push_back("right_child");
+  non_container_child_fields->push_back(right_child_);
+
+  container_child_field_names->push_back("left_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
+  container_child_field_names->push_back("right_join_attributes");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_));
+
+  if (right_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("right_filter_predicate");
+    non_container_child_fields->push_back(right_filter_predicate_);
+  }
+  container_child_field_names->push_back("aggregate_expressions");
+  container_child_fields->push_back(
+      CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_));
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
new file mode 100644
index 0000000..44f8a33
--- /dev/null
+++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class CrossReferenceCoalesceAggregate;
+typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr;
+
+/**
+ * @brief A physical node that fuses a HashJoin with an Aggregate to enable
+ *        fast-path execution.
+ *
+ * Below we briefly describe the semantics of this physical node.
+ *
+ * Let L be a table with PRIMARY KEY u. Let R be a table with FOREIGN KEY x
+ * referring to L(u). Then CrossReferenceCoalesceAggregate represents a common
+ * class of analytical queries that
+ * - For each u in L, COUNT/SUM the records in R that correspond to u (i.e.
+ *   those records satisfying R.x = L.u).
+ *   In the case that there is no record for u in R, use 0 as the result value.
+ *
+ * And we have the mapping:
+ *   L -> left_child_
+ *   R -> right_child_
+ *   u -> left_join_attributes_
+ *   x -> right_join_attributes_
+ *   COUNT/SUM -> aggregate_expressions_
+ */
+class CrossReferenceCoalesceAggregate : public Physical {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kCrossReferenceCoalesceAggregate;
+  }
+
+  std::string getName() const override {
+    return "CrossReferenceCoalesceAggregate";
+  }
+
+  /**
+   * @return The left physical child.
+   */
+  const PhysicalPtr& left_child() const {
+    return left_child_;
+  }
+
+  /**
+   * @return The right physical child.
+   */
+  const PhysicalPtr& right_child() const {
+    return right_child_;
+  }
+
+  /**
+   * @return The left join attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const {
+    return left_join_attributes_;
+  }
+
+  /**
+   * @return The right join attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const {
+    return right_join_attributes_;
+  }
+
+  /**
+   * @return The predicate to be applied to the right child before aggregation.
+   */
+  const expressions::PredicatePtr& right_filter_predicate() const {
+    return right_filter_predicate_;
+  }
+
+  /**
+   * @return Aggregate expressions.
+   */
+  const std::vector<expressions::AliasPtr>& aggregate_expressions() const {
+    return aggregate_expressions_;
+  }
+
+  /**
+   * @return The maximum possible value of the group-by keys when mapped to
+   *         integer.
+   */
+  std::size_t group_by_key_value_range() const {
+    return group_by_key_value_range_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(getNumChildren(), new_children.size());
+    return Create(new_children[0],
+                  new_children[1],
+                  left_join_attributes_,
+                  right_join_attributes_,
+                  right_filter_predicate_,
+                  aggregate_expressions_,
+                  group_by_key_value_range_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override {
+    return false;
+  }
+
+  /**
+   * @brief Creates a physical CrossReferenceCoalesceAggregate.
+   *
+   * @param left_child The left child.
+   * @param right_child The right child.
+   * @param left_join_attributes The join attributes of the left child.
+   * @param right_join_attributes The join attributes of the right child.
+   * @param right_filter_predicate Optional filtering predicate evaluated on
+   *        the left child before aggregation.
+   * @param aggregate_expressions The aggregate expressions.
+   * @param group_by_key_value_range The maximum possible value of the group-by
+   *        keys when mapped to integer.
+   * @return An immutable physical CrossReferenceCoalesceAggregate.
+   */
+  static CrossReferenceCoalesceAggregatePtr Create(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t group_by_key_value_range) {
+    return CrossReferenceCoalesceAggregatePtr(
+        new CrossReferenceCoalesceAggregate(left_child,
+                                            right_child,
+                                            left_join_attributes,
+                                            right_join_attributes,
+                                            right_filter_predicate,
+                                            aggregate_expressions,
+                                            group_by_key_value_range));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CrossReferenceCoalesceAggregate(
+      const PhysicalPtr &left_child,
+      const PhysicalPtr &right_child,
+      const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+      const expressions::PredicatePtr right_filter_predicate,
+      const std::vector<expressions::AliasPtr> &aggregate_expressions,
+      const std::size_t group_by_key_value_range)
+      : left_child_(left_child),
+        right_child_(right_child),
+        left_join_attributes_(left_join_attributes),
+        right_join_attributes_(right_join_attributes),
+        right_filter_predicate_(right_filter_predicate),
+        aggregate_expressions_(aggregate_expressions),
+        group_by_key_value_range_(group_by_key_value_range) {
+    addChild(left_child_);
+    addChild(right_child_);
+  }
+
+  // TODO(jianqiao): For the left child, support filter predicate fusing and
+  // attachment of LIPFilters.
+  PhysicalPtr left_child_;
+  PhysicalPtr right_child_;
+  std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
+  std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+  expressions::PredicatePtr right_filter_predicate_;
+  std::vector<expressions::AliasPtr> aggregate_expressions_;
+  std::size_t group_by_key_value_range_;
+
+  DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 4336767..0204504 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -33,6 +33,7 @@ class Aggregate;
 class BinaryJoin;
 class CopyFrom;
 class CreateTable;
+class CrossReferenceCoalesceAggregate;
 class DeleteTuples;
 class DropTable;
 class FilterJoin;
@@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>;
 using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;
 using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
 using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
+using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate,
+                                                             PhysicalType::kCrossReferenceCoalesceAggregate>;
 using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
 using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
 using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index 1da5929..077bd54 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -36,6 +36,7 @@ enum class PhysicalType {
   kCopyFrom,
   kCreateIndex,
   kCreateTable,
+  kCrossReferenceCoalesceAggregate,
   kDeleteTuples,
   kDropTable,
   kFilterJoin,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/BottomUpRule.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp
index 53dff0d..6c14e64 100644
--- a/query_optimizer/rules/BottomUpRule.hpp
+++ b/query_optimizer/rules/BottomUpRule.hpp
@@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> {
     DCHECK(tree != nullptr);
 
     init(tree);
-    std::vector<std::shared_ptr<const TreeType>> new_children;
-    bool has_changed_children = false;
-    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
-      std::shared_ptr<const TreeType> new_child = apply(child);
-      if (child != new_child && !has_changed_children) {
-        has_changed_children = true;
-      }
-      new_children.push_back(new_child);
-    }
-
-    if (has_changed_children) {
-      return applyToNode(tree->copyWithNewChildren(new_children));
-    } else {
-      return applyToNode(tree);
-    }
+    return applyInternal(tree);
   }
 
  protected:
@@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> {
    *
    * @param input The input tree.
    */
-  virtual void init(const TreeNodePtr &input) {
-  }
+  virtual void init(const TreeNodePtr &input) {}
 
  private:
+  TreeNodePtr applyInternal(const TreeNodePtr &tree) {
+    DCHECK(tree != nullptr);
+
+    std::vector<std::shared_ptr<const TreeType>> new_children;
+    bool has_changed_children = false;
+    for (const std::shared_ptr<const TreeType> &child : tree->children()) {
+      std::shared_ptr<const TreeType> new_child = applyInternal(child);
+      if (child != new_child && !has_changed_children) {
+        has_changed_children = true;
+      }
+      new_children.push_back(new_child);
+    }
+
+    if (has_changed_children) {
+      return applyToNode(tree->copyWithNewChildren(new_children));
+    } else {
+      return applyToNode(tree);
+    }
+  }
+
   DISALLOW_COPY_AND_ASSIGN(BottomUpRule);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 029d816..427500d 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
@@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_expressions_Scalar
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
new file mode 100644
index 0000000..6efc7e8
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseAggregateJoin.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseAggregateJoin::applyToNode(
+    const P::PhysicalPtr &node) {
+  // Currently we consider only Aggregate on HashLeftOuterJoin.
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) ||
+      aggregate->filter_predicate() != nullptr) {
+    return node;
+  }
+
+  P::HashJoinPtr hash_join;
+  if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) ||
+      hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin ||
+      hash_join->residual_predicate() != nullptr) {
+    return node;
+  }
+
+  // Single left join attribute with unique values.
+  const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+      hash_join->left_join_attributes();
+  if (left_join_attributes.size() != 1u ||
+      (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) {
+    return node;
+  }
+
+  // Single group-by attribute that is the same as the right join attribute.
+  const std::vector<E::NamedExpressionPtr> &grouping_expressions =
+      aggregate->grouping_expressions();
+  if (grouping_expressions.size() != 1u ||
+      grouping_expressions.front()->id() != left_join_attributes.front()->id()) {
+    return node;
+  }
+
+  std::unordered_set<E::ExprId> right_side_attr_ids;
+  for (const auto &attr : hash_join->right()->getOutputAttributes()) {
+    right_side_attr_ids.insert(attr->id());
+  }
+
+  // Aggregate expressions only depend on attributes from the right child.
+  const std::vector<E::AliasPtr> &aggregate_expressions =
+      aggregate->aggregate_expressions();
+  for (const auto &expr : aggregate_expressions) {
+    const E::AggregateFunctionPtr aggr_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+
+    const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments();
+    if (arguments.size() != 1u) {
+      return node;
+    }
+
+    E::AttributeReferencePtr arg_attr;
+    if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) ||
+        right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) {
+      return node;
+    }
+  }
+
+  // Collision-free vector aggregation is applicable, and both the left and right
+  // join attributes are range-bounded integer values.
+  const std::size_t estimated_num_groups =
+      cost_model_->estimateNumGroupsForAggregate(aggregate);
+
+  std::size_t max_num_groups_left;
+  if (!cost_model_->canUseCollisionFreeAggregation(aggregate,
+                                                   estimated_num_groups,
+                                                   &max_num_groups_left)) {
+    return node;
+  }
+
+  std::size_t max_num_groups_right;
+  if (!cost_model_->canUseCollisionFreeAggregation(
+           P::Aggregate::Create(hash_join->right(),
+                                E::ToNamedExpressions(hash_join->right_join_attributes()),
+                                aggregate->aggregate_expressions(),
+                                nullptr),
+           estimated_num_groups,
+           &max_num_groups_right)) {
+    return node;
+  }
+
+  // Fuse right child's filter predicate.
+  P::PhysicalPtr right_child = hash_join->right();
+  const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+      hash_join->right_join_attributes();
+  E::PredicatePtr right_filter_predicate = nullptr;
+
+  P::SelectionPtr selection;
+  if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) {
+    if (E::SubsetOfExpressions(right_join_attributes,
+                               selection->input()->getOutputAttributes())) {
+      right_child = selection->input();
+      right_filter_predicate = selection->filter_predicate();
+    }
+  }
+
+  const std::size_t max_num_groups =
+      std::max(max_num_groups_left, max_num_groups_right);
+
+  return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(),
+                                                    right_child,
+                                                    left_join_attributes,
+                                                    right_join_attributes,
+                                                    right_filter_predicate,
+                                                    aggregate_expressions,
+                                                    max_num_groups);
+}
+
+void FuseAggregateJoin::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+      std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/query_optimizer/rules/FuseAggregateJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp
new file mode 100644
index 0000000..f2d4c47
--- /dev/null
+++ b/query_optimizer/rules/FuseAggregateJoin.hpp
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_
+
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to fuse Aggregate nodes with
+ *        HashJoin nodes.
+ */
+class FuseAggregateJoin : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  FuseAggregateJoin() {}
+
+  ~FuseAggregateJoin() override {}
+
+  std::string getName() const override {
+    return "FuseAggregateJoin";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override;
+
+  void init(const physical::PhysicalPtr &input) override;
+
+ private:
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp
new file mode 100644
index 0000000..648e291
--- /dev/null
+++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildAggregationExistenceMapOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogAttribute.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/CollisionFreeVectorTable.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+namespace {
+
+template <typename CppType, bool is_attr_nullable>
+void ExecuteBuild(const attribute_id attr_id,
+                  ValueAccessor *accessor,
+                  BarrieredReadWriteConcurrentBitVector *existence_map) {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        existence_map->setBit(*reinterpret_cast<const CppType *>(value));
+      }
+    }
+  });
+}
+
+// Dispatch helper.
+template <typename CppType>
+void ExecuteHelper(const attribute_id attr_id,
+                   const bool is_attr_nullable,
+                   ValueAccessor *accessor,
+                   BarrieredReadWriteConcurrentBitVector *existence_map)  {
+  if (is_attr_nullable) {
+    ExecuteBuild<CppType, true>(attr_id, accessor, existence_map);
+  } else {
+    ExecuteBuild<CppType, false>(attr_id, accessor, existence_map);
+  }
+}
+
+}  // namespace
+
+bool BuildAggregationExistenceMapOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_block_id,
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildAggregationExistenceMapWorkOrder(
+                query_id_,
+                input_relation_,
+                input_relation_block_ids_[num_workorders_generated_],
+                build_attribute_,
+                query_context->getAggregationState(aggr_state_index_),
+                storage_manager),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildAggregationExistenceMapOperator
+    ::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id block : input_relation_block_ids_) {
+        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+serialization::WorkOrder* BuildAggregationExistenceMapOperator
+    ::createWorkOrderProto(const block_id block) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id,
+                      input_relation_.getID());
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id,
+                      block);
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute,
+                      build_attribute_);
+  proto->SetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index,
+                      aggr_state_index_);
+  return proto;
+}
+
+void BuildAggregationExistenceMapWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor());
+
+  CollisionFreeVectorTable *aggregate_table =
+      state_->getCollisionFreeVectorTable();
+  DCHECK(aggregate_table != nullptr);
+
+  BarrieredReadWriteConcurrentBitVector *existence_map =
+      aggregate_table->getExistenceMap();
+
+  const Type &attr_type =
+      input_relation_.getAttributeById(build_attribute_)->getType();
+  switch (attr_type.getTypeID()) {
+    case TypeID::kInt:
+      ExecuteHelper<int>(build_attribute_,
+                         attr_type.isNullable(),
+                         accessor.get(),
+                         existence_map);
+      return;
+    case TypeID::kLong:
+      ExecuteHelper<std::int64_t>(build_attribute_,
+                                  attr_type.isNullable(),
+                                  accessor.get(),
+                                  existence_map);
+      return;
+    default:
+      LOG(FATAL) << "Build attribute type not supported by "
+                 << "BuildAggregationExistenceMapOperator: "
+                 << attr_type.getName();
+  }
+}
+
+}  // namespace quickstep


Mime
View raw message