quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [2/3] incubator-quickstep git commit: Updates
Date Sat, 30 Jul 2016 01:11:52 GMT
Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/91e49820
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/91e49820
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/91e49820

Branch: refs/heads/adaptive-bloom-filters
Commit: 91e498202d7b19cb5c7d4d8c61218d112c446b71
Parents: 9cc47e5
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Fri Jul 29 17:47:42 2016 -0500
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Fri Jul 29 17:47:42 2016 -0500

----------------------------------------------------------------------
 query_execution/QueryContext.cpp                | 11 ++-
 query_optimizer/ExecutionGenerator.cpp          | 17 ++++
 query_optimizer/ExecutionHeuristics.cpp         | 36 +++++--
 query_optimizer/ExecutionHeuristics.hpp         | 51 +++++++---
 query_optimizer/physical/Aggregate.hpp          | 20 +++-
 query_optimizer/physical/HashJoin.hpp           | 50 ----------
 query_optimizer/physical/Physical.hpp           | 50 ++++++++++
 query_optimizer/rules/AttachBloomFilters.cpp    | 76 ++++++++++-----
 .../StarSchemaHashJoinOrderOptimization.cpp     |  4 +-
 .../StarSchemaHashJoinOrderOptimization.hpp     | 25 +++--
 storage/AggregationOperationState.cpp           | 98 +++++++++++++++++++-
 storage/AggregationOperationState.hpp           | 10 +-
 storage/AggregationOperationState.proto         |  6 ++
 storage/HashTable.hpp                           | 27 ++----
 storage/HashTable.proto                         |  6 +-
 storage/HashTableFactory.hpp                    |  9 +-
 storage/StorageBlock.cpp                        | 28 +-----
 storage/StorageBlock.hpp                        |  7 +-
 utility/PlanVisualizer.cpp                      | 24 +++++
 19 files changed, 377 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 7019b6a..fd0ed08 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -61,15 +61,16 @@ QueryContext::QueryContext(const serialization::QueryContext &proto,
       << "Attempted to create QueryContext from an invalid proto description:\n"
       << proto.DebugString();
 
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+  }
+
   for (int i = 0; i < proto.aggregation_states_size(); ++i) {
     aggregation_states_.emplace_back(
         AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
                                                         database,
-                                                        storage_manager));
-  }
-
-  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
-    bloom_filters_.emplace_back(new BloomFilter(proto.bloom_filters(i)));
+                                                        storage_manager,
+                                                        bloom_filters_));
   }
 
   for (int i = 0; i < proto.generator_functions_size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index fe6b6e7..e10f991 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1344,6 +1344,16 @@ void ExecutionGenerator::convertAggregate(
       findRelationInfoOutputByPhysical(physical_plan->input());
   aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
 
+  const P::BloomFilterConfig &bloom_filter_config =
+      physical_plan->bloom_filter_config();
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+
+  for (const auto &bf : bloom_filter_config.probe_side_bloom_filters) {
+    const CatalogAttribute *bf_catalog_attribute
+        = attribute_substitution_map_[bf.attribute->id()];
+    bloom_filter_attribute_ids.emplace_back(bf_catalog_attribute->getID());
+  }
+
   std::vector<const Type*> group_by_types;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
     unique_ptr<const Scalar> execution_group_by_expression;
@@ -1458,6 +1468,13 @@ void ExecutionGenerator::convertAggregate(
       std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
   temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
                                             output_relation);
+
+  if (FLAGS_optimize_joins) {
+    execution_heuristics_->addAggregateInfo(aggregation_operator_index,
+                                            bloom_filter_config,
+                                            std::move(bloom_filter_attribute_ids),
+                                            aggr_state_index);
+  }
 }
 
 void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.cpp b/query_optimizer/ExecutionHeuristics.cpp
index b407453..26c4378 100644
--- a/query_optimizer/ExecutionHeuristics.cpp
+++ b/query_optimizer/ExecutionHeuristics.cpp
@@ -65,10 +65,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
                          bloom_filter_config.builder),
           std::make_pair(bloom_filter_id, info.build_operator_index_));
 
-      auto *build_side_bloom_filter = hash_table_proto->add_build_side_bloom_filters();
-      build_side_bloom_filter->set_bloom_filter_id(bloom_filter_id);
-      build_side_bloom_filter->set_attr_id(info.build_side_bloom_filter_ids_[i]);
-
+      hash_table_proto->add_build_side_bloom_filter_id(bloom_filter_id);
       std::cout << "Build " << build_side_bf.attribute->toString()
                 << " @" << bloom_filter_config.builder << "\n";
     }
@@ -83,7 +80,7 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
       auto *probe_side_bloom_filter = hash_table_proto->add_probe_side_bloom_filters();
       const auto &probe_side_bf =
           bloom_filter_config.probe_side_bloom_filters[i];
-      std::cout << "Probe " << probe_side_bf.attribute->toString()
+      std::cout << "HashJoin probe " << probe_side_bf.attribute->toString()
                 << " @" << probe_side_bf.builder << "\n";
 
       const auto &build_side_info =
@@ -92,13 +89,40 @@ void ExecutionHeuristics::optimizeExecutionPlan(QueryPlan *query_plan,
                               probe_side_bf.builder));
       probe_side_bloom_filter->set_bloom_filter_id(build_side_info.first);
       probe_side_bloom_filter->set_attr_id(info.probe_side_bloom_filter_ids_[i]);
-      std::cout << "Probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n";
+      std::cout << "HashJoin probe attr_id = " << info.probe_side_bloom_filter_ids_[i] << "\n";
 
       query_plan->addDirectDependency(info.join_operator_index_,
                                       build_side_info.second,
                                       true /* is_pipeline_breaker */);
     }
   }
+
+  for (const auto &info : aggregates_) {
+    auto *aggregate_proto =
+        query_context_proto->mutable_aggregation_states(info.aggregate_state_id_);
+    const auto &bloom_filter_config = info.bloom_filter_config_;
+
+    for (std::size_t i = 0; i < info.bloom_filter_ids_.size(); ++i) {
+      auto *bloom_filter = aggregate_proto->add_bloom_filters();
+      const auto &bf =
+          bloom_filter_config.probe_side_bloom_filters[i];
+      std::cout << "Aggregate probe " << bf.attribute->toString()
+                << " @" << bf.builder << "\n";
+
+      const auto &build_side_info =
+           bloom_filter_map.at(
+               std::make_pair(bf.source_attribute->id(),
+                              bf.builder));
+      bloom_filter->set_bloom_filter_id(build_side_info.first);
+      bloom_filter->set_attr_id(info.bloom_filter_ids_[i]);
+      std::cout << "Aggregate probe attr_id = "
+                << info.bloom_filter_ids_[i] << "\n";
+
+      query_plan->addDirectDependency(info.aggregate_operator_index_,
+                                      build_side_info.second,
+                                      true /* is_pipeline_breaker */);
+    }
+  }
 }
 
 void ExecutionHeuristics::setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/ExecutionHeuristics.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionHeuristics.hpp b/query_optimizer/ExecutionHeuristics.hpp
index 8af1b4a..0755124 100644
--- a/query_optimizer/ExecutionHeuristics.hpp
+++ b/query_optimizer/ExecutionHeuristics.hpp
@@ -93,6 +93,23 @@ class ExecutionHeuristics {
     const std::size_t estimated_build_relation_cardinality_;
   };
 
+  struct AggregateInfo {
+    AggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+                  const physical::BloomFilterConfig &bloom_filter_config,
+                  std::vector<attribute_id> &&bloom_filter_ids,
+                  const QueryContext::aggregation_state_id aggregate_state_id)
+        : aggregate_operator_index_(aggregate_operator_index),
+          bloom_filter_config_(bloom_filter_config),
+          bloom_filter_ids_(bloom_filter_ids),
+          aggregate_state_id_(aggregate_state_id) {
+    }
+
+    const QueryPlan::DAGNodeIndex aggregate_operator_index_;
+    const physical::BloomFilterConfig &bloom_filter_config_;
+    const std::vector<attribute_id> bloom_filter_ids_;
+    const QueryContext::aggregation_state_id aggregate_state_id_;
+  };
+
 
   /**
    * @brief Constructor.
@@ -121,15 +138,25 @@ class ExecutionHeuristics {
                               std::vector<attribute_id> &&probe_side_bloom_filter_ids,
                               const QueryContext::join_hash_table_id join_hash_table_id,
                               const std::size_t estimated_build_relation_cardinality) {
-    hash_joins_.push_back(HashJoinInfo(build_operator_index,
-                                       join_operator_index,
-                                       referenced_stored_build_relation,
-                                       referenced_stored_probe_relation,
-                                       bloom_filter_config,
-                                       std::move(build_side_bloom_filter_ids),
-                                       std::move(probe_side_bloom_filter_ids),
-                                       join_hash_table_id,
-                                       estimated_build_relation_cardinality));
+    hash_joins_.emplace_back(build_operator_index,
+                             join_operator_index,
+                             referenced_stored_build_relation,
+                             referenced_stored_probe_relation,
+                             bloom_filter_config,
+                             std::move(build_side_bloom_filter_ids),
+                             std::move(probe_side_bloom_filter_ids),
+                             join_hash_table_id,
+                             estimated_build_relation_cardinality);
+  }
+
+  inline void addAggregateInfo(const QueryPlan::DAGNodeIndex aggregate_operator_index,
+                               const physical::BloomFilterConfig &bloom_filter_config,
+                               std::vector<attribute_id> &&bloom_filter_ids,
+                               const QueryContext::aggregation_state_id aggregate_state_id) {
+    aggregates_.emplace_back(aggregate_operator_index,
+                             bloom_filter_config,
+                             std::move(bloom_filter_ids),
+                             aggregate_state_id);
   }
 
   /**
@@ -152,13 +179,9 @@ class ExecutionHeuristics {
   void setBloomFilterProperties(serialization::BloomFilter *bloom_filter_proto,
                                 const std::size_t cardinality);
 
-  std::size_t estimated_build_relation_cardinality() const {
-    return estimated_build_relation_cardinality_;
-  }
-
  private:
   std::vector<HashJoinInfo> hash_joins_;
-  std::size_t estimated_build_relation_cardinality_;
+  std::vector<AggregateInfo> aggregates_;
 
   DISALLOW_COPY_AND_ASSIGN(ExecutionHeuristics);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Aggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Aggregate.hpp b/query_optimizer/physical/Aggregate.hpp
index e40d894..b40997c 100644
--- a/query_optimizer/physical/Aggregate.hpp
+++ b/query_optimizer/physical/Aggregate.hpp
@@ -101,6 +101,10 @@ class Aggregate : public Physical {
   bool impliesUniqueAttributes(
       const std::vector<expressions::AttributeReferencePtr> &attributes) const override;
 
+  const BloomFilterConfig &bloom_filter_config() const {
+    return bloom_filter_config_;
+  }
+
   /**
    * @brief Creates an Aggregate physical node.
    *
@@ -114,9 +118,14 @@ class Aggregate : public Physical {
       PhysicalPtr input,
       const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
       const std::vector<expressions::AliasPtr> &aggregate_expressions,
-      const expressions::PredicatePtr &filter_predicate) {
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig bloom_filter_config = BloomFilterConfig()) {
     return AggregatePtr(
-        new Aggregate(input, grouping_expressions, aggregate_expressions, filter_predicate));
+        new Aggregate(input,
+                      grouping_expressions,
+                      aggregate_expressions,
+                      filter_predicate,
+                      bloom_filter_config));
   }
 
  protected:
@@ -133,11 +142,13 @@ class Aggregate : public Physical {
       PhysicalPtr input,
       const std::vector<expressions::NamedExpressionPtr> &grouping_expressions,
       const std::vector<expressions::AliasPtr> &aggregate_expressions,
-      const expressions::PredicatePtr &filter_predicate)
+      const expressions::PredicatePtr &filter_predicate,
+      const BloomFilterConfig &bloom_filter_config)
       : input_(input),
         grouping_expressions_(grouping_expressions),
         aggregate_expressions_(aggregate_expressions),
-        filter_predicate_(filter_predicate) {
+        filter_predicate_(filter_predicate),
+        bloom_filter_config_(bloom_filter_config) {
     addChild(input_);
   }
 
@@ -145,6 +156,7 @@ class Aggregate : public Physical {
   std::vector<expressions::NamedExpressionPtr> grouping_expressions_;
   std::vector<expressions::AliasPtr> aggregate_expressions_;
   expressions::PredicatePtr filter_predicate_;
+  BloomFilterConfig bloom_filter_config_;
 
   DISALLOW_COPY_AND_ASSIGN(Aggregate);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index cacb08b..104cb52 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -48,56 +48,6 @@ namespace physical {
 class HashJoin;
 typedef std::shared_ptr<const HashJoin> HashJoinPtr;
 
-struct BloomFilterConfig {
-  struct BuildSide {
-    BuildSide(const expressions::AttributeReferencePtr &attribute_in)
-        : attribute(attribute_in) {
-    }
-    expressions::AttributeReferencePtr attribute;
-  };
-  struct ProbeSide {
-    ProbeSide(const expressions::AttributeReferencePtr &attribute_in,
-              const expressions::AttributeReferencePtr &source_attribute_in,
-              const physical::PhysicalPtr &builder_in)
-        : attribute(attribute_in),
-          source_attribute(source_attribute_in),
-          builder(builder_in) {
-    }
-    expressions::AttributeReferencePtr attribute;
-    expressions::AttributeReferencePtr source_attribute;
-    PhysicalPtr builder;
-  };
-  BloomFilterConfig() {}
-  BloomFilterConfig(const PhysicalPtr &builder_in)
-      : builder(builder_in) {
-  }
-  BloomFilterConfig(const PhysicalPtr &builder_in,
-                    const std::vector<BuildSide> &build_side_bloom_filters_in,
-                    const std::vector<ProbeSide> &probe_side_bloom_filters_in)
-      : builder(builder_in),
-        build_side_bloom_filters(build_side_bloom_filters_in),
-        probe_side_bloom_filters(probe_side_bloom_filters_in) {
-  }
-  void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) {
-    for (const auto &build_bf : build_side_bloom_filters) {
-      if (attribute_in == build_bf.attribute) {
-        return;
-      }
-    }
-    build_side_bloom_filters.emplace_back(attribute_in);
-  }
-  void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in,
-                               const expressions::AttributeReferencePtr &source_attribute_in,
-                               const physical::PhysicalPtr &builder_in) {
-    probe_side_bloom_filters.emplace_back(attribute_in,
-                                          source_attribute_in,
-                                          builder_in);
-  }
-  PhysicalPtr builder;
-  std::vector<BuildSide> build_side_bloom_filters;
-  std::vector<ProbeSide> probe_side_bloom_filters;
-};
-
 /**
  * @brief Physical hash join node.
  */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 721b987..389cd05 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -39,6 +39,56 @@ namespace physical {
 class Physical;
 typedef std::shared_ptr<const Physical> PhysicalPtr;
 
+struct BloomFilterConfig {
+  struct BuildSide {
+    BuildSide(const expressions::AttributeReferencePtr &attribute_in)
+        : attribute(attribute_in) {
+    }
+    expressions::AttributeReferencePtr attribute;
+  };
+  struct ProbeSide {
+    ProbeSide(const expressions::AttributeReferencePtr &attribute_in,
+              const expressions::AttributeReferencePtr &source_attribute_in,
+              const physical::PhysicalPtr &builder_in)
+        : attribute(attribute_in),
+          source_attribute(source_attribute_in),
+          builder(builder_in) {
+    }
+    expressions::AttributeReferencePtr attribute;
+    expressions::AttributeReferencePtr source_attribute;
+    PhysicalPtr builder;
+  };
+  BloomFilterConfig() {}
+  BloomFilterConfig(const PhysicalPtr &builder_in)
+      : builder(builder_in) {
+  }
+  BloomFilterConfig(const PhysicalPtr &builder_in,
+                    const std::vector<BuildSide> &build_side_bloom_filters_in,
+                    const std::vector<ProbeSide> &probe_side_bloom_filters_in)
+      : builder(builder_in),
+        build_side_bloom_filters(build_side_bloom_filters_in),
+        probe_side_bloom_filters(probe_side_bloom_filters_in) {
+  }
+  void addBuildSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in) {
+    for (const auto &build_bf : build_side_bloom_filters) {
+      if (attribute_in == build_bf.attribute) {
+        return;
+      }
+    }
+    build_side_bloom_filters.emplace_back(attribute_in);
+  }
+  void addProbeSideBloomFilter(const expressions::AttributeReferencePtr &attribute_in,
+                               const expressions::AttributeReferencePtr &source_attribute_in,
+                               const physical::PhysicalPtr &builder_in) {
+    probe_side_bloom_filters.emplace_back(attribute_in,
+                                          source_attribute_in,
+                                          builder_in);
+  }
+  PhysicalPtr builder;
+  std::vector<BuildSide> build_side_bloom_filters;
+  std::vector<ProbeSide> probe_side_bloom_filters;
+};
+
 /**
  * @brief Base class for physical plan nodes.
  */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
index e3bdc36..f6602b8 100644
--- a/query_optimizer/rules/AttachBloomFilters.cpp
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -51,24 +51,24 @@ P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
   visitProducer(input, 0);
   visitConsumer(input);
 
-  for (const auto &info_vec_pair : consumers_) {
-    std::cerr << "--------\n"
-              << "Node " << info_vec_pair.first->getName()
-              << " " << info_vec_pair.first << "\n";
-
-    for (const auto &info : info_vec_pair.second) {
-      std::cerr << info.attribute->attribute_alias();
-      if (info.attribute->id() != info.source_attribute->id()) {
-        std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
-      }
-      if (info.from_sibling) {
-        std::cerr << " sibling";
-      }
-      std::cerr << " @" << info.source << "[" << info.depth << "]"
-                << ": " << info.selectivity << "\n";
-    }
-    std::cerr << "********\n";
-  }
+//  for (const auto &info_vec_pair : consumers_) {
+//    std::cerr << "--------\n"
+//              << "Node " << info_vec_pair.first->getName()
+//              << " " << info_vec_pair.first << "\n";
+//
+//    for (const auto &info : info_vec_pair.second) {
+//      std::cerr << info.attribute->attribute_alias();
+//      if (info.attribute->id() != info.source_attribute->id()) {
+//        std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
+//      }
+//      if (info.from_sibling) {
+//        std::cerr << " sibling";
+//      }
+//      std::cerr << " @" << info.source << "[" << info.depth << "]"
+//                << ": " << info.selectivity << "\n";
+//    }
+//    std::cerr << "********\n";
+//  }
 
   return visitAndAttach(input);
 }
@@ -192,9 +192,20 @@ void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
                                             info.attribute);
       }
     }
+  }
 
+  P::PhysicalPtr consumer_child = nullptr;
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+  }
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+  }
+
+  if (consumer_child != nullptr) {
     // Decide attaches
-    if (cost_model_->estimateCardinality(consumer_child) > 200000000 &&
+    auto &consumer_bloom_filters = consumers_[consumer_child];
+    if (cost_model_->estimateCardinality(consumer_child) > 10000000 &&
         !consumer_bloom_filters.empty()) {
       std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters;
       for (const auto &info : consumer_bloom_filters) {
@@ -240,10 +251,10 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n
   if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
     const auto attach_it = attaches_.find(node);
     if (attach_it != attaches_.end()) {
-      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
-        std::cout << "Attach probe from " << item.builder
-                  << " to " << node << "\n";
-      }
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
 
       const P::HashJoinPtr hash_join =
           std::static_pointer_cast<const P::HashJoin>(node);
@@ -259,6 +270,25 @@ P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &n
     }
   }
 
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+      return P::Aggregate::Create(
+          aggregate->input(),
+          aggregate->grouping_expressions(),
+          aggregate->aggregate_expressions(),
+          aggregate->filter_predicate(),
+          attach_it->second);
+    }
+  }
+
   if (has_changed) {
     return node->copyWithNewChildren(new_children);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 42a7402..22485b2 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -150,7 +150,8 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         cost_model_->estimateCardinality(tables[i]),
         cost_model_->estimateSelectivity(tables[i]),
         CountSharedAttributes(join_group.referenced_attributes,
-                              tables[i]->getOutputAttributes()));
+                              tables[i]->getOutputAttributes()),
+        tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
   }
 
   // Auxiliary mapping info.
@@ -316,6 +317,7 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
       selected_probe_table_info->estimated_num_output_attributes =
           CountSharedAttributes(join_group.referenced_attributes,
                                 output->getOutputAttributes());
+      selected_probe_table_info->is_aggregation = false;
 
       remaining_tables.emplace(selected_probe_table_info);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index a0e34ce..7a6fa81 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -75,12 +75,14 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
               const physical::PhysicalPtr &table_in,
               const std::size_t estimated_cardinality_in,
               const double estimated_selectivity_in,
-              const std::size_t estimated_num_output_attributes_in)
+              const std::size_t estimated_num_output_attributes_in,
+              const bool is_aggregation_in)
         : table_info_id(table_info_id_in),
           table(table_in),
           estimated_cardinality(estimated_cardinality_in),
           estimated_selectivity(estimated_selectivity_in),
-          estimated_num_output_attributes(estimated_num_output_attributes_in) {
+          estimated_num_output_attributes(estimated_num_output_attributes_in),
+          is_aggregation(is_aggregation_in) {
     }
 
     const std::size_t table_info_id;
@@ -88,6 +90,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
     std::size_t estimated_cardinality;
     double estimated_selectivity;
     std::size_t estimated_num_output_attributes;
+    bool is_aggregation;
   };
 
   struct JoinPair {
@@ -107,13 +110,17 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
         return rhs_has_large_output;
       }
 
-//      const bool lhs_has_small_build =
-//          !lhs_has_large_output && lhs.build->estimated_cardinality < 0x1000;
-//      const bool rhs_has_small_build =
-//          !rhs_has_large_output && rhs.build->estimated_cardinality < 0x1000;
-//      if (lhs_has_small_build != rhs_has_small_build) {
-//        return lhs_has_small_build;
-//      }
+      const bool lhs_has_small_build =
+          !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100;
+      const bool rhs_has_small_build =
+          !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100;
+      if (lhs_has_small_build != rhs_has_small_build) {
+        return lhs_has_small_build;
+      }
+
+      if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) {
+        return lhs.probe->is_aggregation;
+      }
 
       if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
         return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..8c9e8b6 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 
 #include "glog/logging.h"
 
@@ -57,6 +60,8 @@ using std::unique_ptr;
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction*> &aggregate_functions,
@@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState(
     std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
     const Predicate *predicate,
+    std::vector<const BloomFilter *> &&bloom_filters,
+    std::vector<attribute_id> &&bloom_filter_attribute_ids,
     const std::size_t estimated_num_entries,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       predicate_(predicate),
+      bloom_filters_(std::move(bloom_filters)),
+      bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
@@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState(
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
     const serialization::AggregationOperationState &proto,
     const CatalogDatabaseLite &database,
-    StorageManager *storage_manager) {
+    StorageManager *storage_manager,
+    const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
@@ -232,12 +242,25 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
                                                database));
   }
 
+  std::vector<const BloomFilter*> bloom_filter_vector;
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    std::cerr << "Add bloom filter " << i << "\n";
+    // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+    const auto bloom_filter_proto = proto.bloom_filters(i);
+    bloom_filter_vector.emplace_back(
+        bloom_filters[bloom_filter_proto.bloom_filter_id()].get());
+    bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id());
+  }
+
   return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
                                        aggregate_functions,
                                        std::move(arguments),
                                        std::move(is_distinct),
                                        std::move(group_by_expressions),
                                        predicate.release(),
+                                       std::move(bloom_filter_vector),
+                                       std::move(bloom_filter_attribute_ids),
                                        proto.estimated_num_entries(),
                                        HashTableImplTypeFromProto(proto.hash_table_impl_type()),
                                        distinctify_hash_table_impl_types,
@@ -340,6 +363,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
   // tuples so that it can be reused across multiple aggregates (i.e. we only
   // pay the cost of evaluating the predicate once).
   std::unique_ptr<TupleIdSequence> reuse_matches;
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -358,7 +385,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
                                arguments_[agg_idx],
                                local_arguments_as_attributes,
                                {}, /* group_by */
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                nullptr /* reuse_group_by_vectors */);
@@ -369,7 +395,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
           block->aggregate(*handles_[agg_idx],
                            arguments_[agg_idx],
                            local_arguments_as_attributes,
-                           predicate_.get(),
                            &reuse_matches));
     }
   }
@@ -391,6 +416,71 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // GROUP BY expressions once).
   std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
 
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
+  if (bloom_filters_.size() > 0) {
+    const std::size_t num_tuples = block->getNumTuples();
+//    std::cerr << "Before: "
+//              << (reuse_matches ? reuse_matches->numTuples() : num_tuples)
+//              << "\n";
+    std::unique_ptr<ValueAccessor> accessor;
+    if (reuse_matches) {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get()));
+    } else {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor());
+    }
+    InvokeOnAnyValueAccessor(
+        accessor.get(),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples));
+
+      std::vector<std::size_t> attr_size_vector;
+      attr_size_vector.reserve(bloom_filter_attribute_ids_.size());
+      for (const auto &attr : bloom_filter_attribute_ids_) {
+        auto val_and_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+        attr_size_vector.emplace_back(val_and_size.second);
+      }
+
+      std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+          bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector));
+
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
+        for (std::size_t t = 0; t < num_hits; ++t){
+          filtered->set(batch[t], true);
+        }
+
+        batch.clear();
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (num_tuples_left > 0);
+
+      if (reuse_matches) {
+        reuse_matches->intersectWith(*filtered);
+      } else {
+        reuse_matches.reset(filtered.release());
+      }
+    });
+//    std::cerr << "After: " << reuse_matches->numTuples() << "\n";
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                                arguments_[agg_idx],
                                nullptr, /* arguments_as_attributes */
                                group_by_list_,
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                &reuse_group_by_vectors);
@@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
       block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
-                              predicate_.get(),
                               agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..5db7325 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -108,6 +109,8 @@ class AggregationOperationState {
                             std::vector<bool> &&is_distinct,
                             std::vector<std::unique_ptr<const Scalar>> &&group_by,
                             const Predicate *predicate,
+                            std::vector<const BloomFilter *> &&bloom_filters,
+                            std::vector<attribute_id> &&bloom_filter_attribute_ids,
                             const std::size_t estimated_num_entries,
                             const HashTableImplType hash_table_impl_type,
                             const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
@@ -131,7 +134,8 @@ class AggregationOperationState {
   static AggregationOperationState* ReconstructFromProto(
       const serialization::AggregationOperationState &proto,
       const CatalogDatabaseLite &database,
-      StorageManager *storage_manager);
+      StorageManager *storage_manager,
+      const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters);
 
   /**
    * @brief Check whether a serialization::AggregationOperationState is
@@ -181,6 +185,10 @@ class AggregationOperationState {
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
   std::unique_ptr<const Predicate> predicate_;
+
+  std::vector<const BloomFilter*> bloom_filters_;
+  std::vector<attribute_id> bloom_filter_attribute_ids_;
+
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index bf78e3a..165148e 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,10 @@ message AggregationOperationState {
 
   // Each DISTINCT aggregation has its distinctify hash table impl type.
   repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+  message BloomFilter {
+    required uint32 bloom_filter_id = 1;
+    required uint32 attr_id = 2;
+  }
+  repeated BloomFilter bloom_filters = 8;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index 2c526c2..04c2ca8 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -41,14 +41,12 @@
 #include "types/TypedValue.hpp"
 #include "utility/BloomFilter.hpp"
 #include "utility/BloomFilterAdapter.hpp"
-#include "utility/EventProfiler.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
 DECLARE_int64(bloom_adapter_batch_size);
-DECLARE_bool(adapt_bloom_filters);
 
 /** \addtogroup Storage
  *  @{
@@ -1048,7 +1046,7 @@ class HashTable : public HashTableBase<resizable,
    * @param probe_attribute_ids The vector of attribute ids to use for probing
    *        the bloom filter.
    **/
-  inline void addProbeSideAttributeIds(const attribute_id &probe_attribute_id) {
+  inline void addProbeSideAttributeId(const attribute_id probe_attribute_id) {
     probe_attribute_ids_.push_back(probe_attribute_id);
   }
 
@@ -2263,7 +2261,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       for (const auto &probe_attr : probe_attribute_ids_) {
         auto val_and_size =
             accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, probe_attr);
-        attr_size_vector.push_back(val_and_size.second);
+        attr_size_vector.emplace_back(val_and_size.second);
       }
 
       bloom_filter_adapter.reset(new BloomFilterAdapter(
@@ -2280,30 +2278,18 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
       std::uint32_t num_tuples_left = accessor->getNumTuples();
       std::vector<tuple_id> batch(num_tuples_left);
 
-      auto *container = simple_profiler.getContainer();
-      auto *line = container->getEventLine(0);
-
       do {
-        const std::uint32_t batch_size =
+        std::uint32_t batch_size =
             batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
         for (std::size_t i = 0; i < batch_size; ++i) {
           accessor->next();
           batch.push_back(accessor->getCurrentPosition());
         }
 
-        line->emplace_back();
-        std::size_t num_hits;
-        if (FLAGS_adapt_bloom_filters) {
-          num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
-        } else {
-          num_hits = bloom_filter_adapter->bulkProbe<false>(accessor, batch);
-        }
-        line->back().setPayload(num_hits+0);
-        line->back().endEvent();
-//        std::size_t num_hits = batch_size;
+        std::size_t num_hits = bloom_filter_adapter->bulkProbe<true>(accessor, batch);
 
-        for (std::size_t i = 0; i < num_hits; ++i){
-          const tuple_id probe_tid = batch[i];
+        for (std::size_t t = 0; t < num_hits; ++t){
+          tuple_id probe_tid = batch[t];
           TypedValue key = accessor->getTypedValueAtAbsolutePosition(key_attr_id, probe_tid);
           if (check_for_null_keys && key.isNull()) {
             continue;
@@ -2320,6 +2306,7 @@ void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_
               break;
           }
         }
+        batch.clear();
         num_tuples_left -= batch_size;
         batch_size_try = batch_size * 2;
       } while (!accessor->iterationFinished());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 0cf9f5e..90bc9f7 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -34,10 +34,10 @@ message HashTable {
   required HashTableImplType hash_table_impl_type = 1;
   repeated Type key_types = 2;
   required uint64 estimated_num_entries = 3;
-  message BloomFilterReference {
+  repeated uint32 build_side_bloom_filter_id = 4;
+  message ProbeSideBloomFilter {
     required uint32 bloom_filter_id = 1;
     required uint32 attr_id = 2;
   }
-  repeated BloomFilterReference build_side_bloom_filters = 4;
-  repeated BloomFilterReference probe_side_bloom_filters = 5;
+  repeated ProbeSideBloomFilter probe_side_bloom_filters = 6;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 00a09c1..df2962a 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -318,11 +318,9 @@ class HashTableFactory {
     //                 individual implementations of the hash table constructors.
 
     // Check if there are any build side bloom filter defined on the hash table.
-    if (proto.build_side_bloom_filters_size() > 0) {
-      CHECK_EQ(1u, proto.build_side_bloom_filters_size());
+    if (proto.build_side_bloom_filter_id_size() > 0) {
       hash_table->enableBuildSideBloomFilter();
-      hash_table->setBuildSideBloomFilter(
-          bloom_filters[proto.build_side_bloom_filters(0).bloom_filter_id()].get());
+      hash_table->setBuildSideBloomFilter(bloom_filters[proto.build_side_bloom_filter_id(0)].get());
     }
 
     // Check if there are any probe side bloom filters defined on the hash table.
@@ -335,8 +333,7 @@ class HashTableFactory {
         hash_table->addProbeSideBloomFilter(
             bloom_filters[probe_side_bloom_filter.bloom_filter_id()].get());
 
-        // Add the attribute ids corresponding to this probe bloom filter.
-        hash_table->addProbeSideAttributeIds(probe_side_bloom_filter.attr_id());
+        hash_table->addProbeSideAttributeId(probe_side_bloom_filter.attr_id());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index fdd438d..78aba7c 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -389,15 +389,7 @@ AggregationState* StorageBlock::aggregate(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
-    const Predicate *predicate,
     std::unique_ptr<TupleIdSequence> *reuse_matches) const {
-  // If there is a filter predicate that hasn't already been evaluated,
-  // evaluate it now and save the results for other aggregates on this same
-  // block.
-  if (predicate && !*reuse_matches) {
-    reuse_matches->reset(getMatchesForPredicate(predicate));
-  }
-
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
   // If all the arguments to this aggregate are plain relation attributes,
   // aggregate directly on a ValueAccessor from this block to avoid a copy.
@@ -418,7 +410,6 @@ void StorageBlock::aggregateGroupBy(
     const AggregationHandle &handle,
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
     AggregationStateHashTableBase *hash_table,
     std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -440,14 +431,7 @@ void StorageBlock::aggregateGroupBy(
   ColumnVectorsValueAccessor temp_result;
   {
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
+    if (reuse_matches) {
       // Create a filtered ValueAccessor that only iterates over predicate
       // matches.
       accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));
@@ -499,7 +483,6 @@ void StorageBlock::aggregateDistinct(
     const std::vector<std::unique_ptr<const Scalar>> &arguments,
     const std::vector<attribute_id> *arguments_as_attributes,
     const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const Predicate *predicate,
     AggregationStateHashTableBase *distinctify_hash_table,
     std::unique_ptr<TupleIdSequence> *reuse_matches,
     std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const {
@@ -514,14 +497,7 @@ void StorageBlock::aggregateDistinct(
   ColumnVectorsValueAccessor temp_result;
   {
     std::unique_ptr<ValueAccessor> accessor;
-    if (predicate) {
-      if (!*reuse_matches) {
-        // If there is a filter predicate that hasn't already been evaluated,
-        // evaluate it now and save the results for other aggregates on this
-        // same block.
-        reuse_matches->reset(getMatchesForPredicate(predicate));
-      }
-
+    if (reuse_matches) {
       // Create a filtered ValueAccessor that only iterates over predicate
       // matches.
       accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get()));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 3ae3812..3217fa2 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -410,7 +410,6 @@ class StorageBlock : public StorageBlockBase {
       const AggregationHandle &handle,
       const std::vector<std::unique_ptr<const Scalar>> &arguments,
       const std::vector<attribute_id> *arguments_as_attributes,
-      const Predicate *predicate,
       std::unique_ptr<TupleIdSequence> *reuse_matches) const;
 
   /**
@@ -460,7 +459,6 @@ class StorageBlock : public StorageBlockBase {
   void aggregateGroupBy(const AggregationHandle &handle,
                         const std::vector<std::unique_ptr<const Scalar>> &arguments,
                         const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                        const Predicate *predicate,
                         AggregationStateHashTableBase *hash_table,
                         std::unique_ptr<TupleIdSequence> *reuse_matches,
                         std::vector<std::unique_ptr<ColumnVector>>
@@ -505,7 +503,6 @@ class StorageBlock : public StorageBlockBase {
                          const std::vector<std::unique_ptr<const Scalar>> &arguments,
                          const std::vector<attribute_id> *arguments_as_attributes,
                          const std::vector<std::unique_ptr<const Scalar>> &group_by,
-                         const Predicate *predicate,
                          AggregationStateHashTableBase *distinctify_hash_table,
                          std::unique_ptr<TupleIdSequence> *reuse_matches,
                          std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
@@ -588,6 +585,8 @@ class StorageBlock : public StorageBlockBase {
    **/
   const std::size_t getNumTuples() const;
 
+  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
+
  private:
   static TupleStorageSubBlock* CreateTupleStorageSubBlock(
       const CatalogRelationSchema &relation,
@@ -627,8 +626,6 @@ class StorageBlock : public StorageBlockBase {
   // StorageBlock's header.
   bool rebuildIndexes(bool short_circuit);
 
-  TupleIdSequence* getMatchesForPredicate(const Predicate *predicate) const;
-
   std::unordered_map<attribute_id, TypedValue>* generateUpdatedValues(
       const ValueAccessor &accessor,
       const tuple_id tuple,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/91e49820/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index 37fa790..4cc1b0f 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -29,6 +29,7 @@
 
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -155,6 +156,29 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
         node_info.labels.emplace_back("RIGHT join attrs unique");
       }
 
+      const auto &bf_config = hash_join->bloom_filter_config();
+      for (const auto &bf : bf_config.build_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF build] ") + bf.attribute->attribute_alias());
+      }
+      for (const auto &bf : bf_config.probe_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF probe] ") + bf.attribute->attribute_alias());
+      }
+
+      break;
+    }
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr aggregate =
+        std::static_pointer_cast<const P::Aggregate>(input);
+      node_info.labels.emplace_back(input->getName());
+
+      const auto &bf_config = aggregate->bloom_filter_config();
+      for (const auto &bf : bf_config.probe_side_bloom_filters) {
+        node_info.labels.emplace_back(
+            std::string("[BF probe] ") + bf.attribute->attribute_alias());
+      }
+
       break;
     }
     default: {


Mime
View raw message