quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [3/4] incubator-quickstep git commit: Initial commit
Date Sat, 30 Jul 2016 06:15:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/rules/AttachBloomFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.cpp b/query_optimizer/rules/AttachBloomFilters.cpp
new file mode 100644
index 0000000..03a42a0
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.cpp
@@ -0,0 +1,308 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/rules/AttachBloomFilters.hpp"
+
+#include <memory>
+#include <set>
+#include <unordered_set>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr AttachBloomFilters::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(
+          std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans()));
+
+  visitProducer(input, 0);
+  visitConsumer(input);
+
+//  for (const auto &info_vec_pair : consumers_) {
+//    std::cerr << "--------\n"
+//              << "Node " << info_vec_pair.first->getName()
+//              << " " << info_vec_pair.first << "\n";
+//
+//    for (const auto &info : info_vec_pair.second) {
+//      std::cerr << info.attribute->attribute_alias();
+//      if (info.attribute->id() != info.source_attribute->id()) {
+//        std::cerr << "{FROM " << info.source_attribute->attribute_alias() << "}";
+//      }
+//      if (info.from_sibling) {
+//        std::cerr << " sibling";
+//      }
+//      std::cerr << " @" << info.source << "[" << info.depth << "]"
+//                << ": " << info.selectivity << "\n";
+//    }
+//    std::cerr << "********\n";
+//  }
+
+  return visitAndAttach(input);
+}
+
+void AttachBloomFilters::visitProducer(const P::PhysicalPtr &node, const int depth) {
+  for (const P::PhysicalPtr &child : node->children()) {
+    visitProducer(child, depth+1);
+  }
+
+  std::vector<BloomFilterInfo> bloom_filters;
+
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const P::HashJoinPtr &hash_join =
+        std::static_pointer_cast<const P::HashJoin>(node);
+    const P::PhysicalPtr &build_node = hash_join->right();
+    double selectivity = cost_model_->estimateSelectivity(build_node);
+    if (selectivity < 1.0) {
+      auto &build_node_info = producers_[build_node];
+      for (const auto &attr : hash_join->right_join_attributes()) {
+        build_node_info.emplace_back(node, attr, depth, selectivity, false);
+      }
+    }
+  }
+
+  const std::vector<E::AttributeReferencePtr> output_attributes(
+      node->getOutputAttributes());
+  std::unordered_set<E::ExprId> output_attribute_ids;
+  for (const auto &attr : output_attributes) {
+    output_attribute_ids.emplace(attr->id());
+  }
+
+  // First check inherited bloom filters
+  std::vector<const BloomFilterInfo*> candidates;
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kAggregate:
+    case P::PhysicalType::kSelection:
+    case P::PhysicalType::kHashJoin: {
+      for (const P::PhysicalPtr &child : node->children()) {
+        for (const BloomFilterInfo &info : producers_[child]) {
+          candidates.emplace_back(&info);
+        }
+      }
+    }
+    default:
+      break;
+  }
+
+  for (const BloomFilterInfo *info : candidates) {
+    if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) {
+      bloom_filters.emplace_back(
+          info->source, info->attribute, info->depth, info->selectivity, false);
+    }
+  }
+
+  // Self-produced bloom filters
+//  double selectivity = cost_model_->estimateSelectivity(node);
+//  if (selectivity < 1.0) {
+//    for (const auto &attr : output_attributes) {
+//      bloom_filters.emplace_back(node, attr, depth, selectivity, false);
+//    }
+//  }
+
+  producers_.emplace(node, std::move(bloom_filters));
+}
+
+void AttachBloomFilters::visitConsumer(const P::PhysicalPtr &node) {
+  std::vector<BloomFilterInfo> bloom_filters;
+
+  // Bloom filters from parent
+  const auto &parent_bloom_filters = consumers_[node];
+  if (!parent_bloom_filters.empty()) {
+    for (const auto &child : node->children()) {
+      std::unordered_set<E::ExprId> child_output_attribute_ids;
+      for (const auto &attr : child->getOutputAttributes()) {
+        child_output_attribute_ids.emplace(attr->id());
+      }
+
+      std::vector<BloomFilterInfo> bloom_filters;
+      for (const auto &info : parent_bloom_filters) {
+        if (child_output_attribute_ids.find(info.attribute->id())
+                != child_output_attribute_ids.end()) {
+          bloom_filters.emplace_back(info.source,
+                                     info.attribute,
+                                     info.depth,
+                                     info.selectivity,
+                                     false,
+                                     info.source_attribute);
+        }
+      }
+      consumers_.emplace(child, std::move(bloom_filters));
+    }
+  }
+
+  // Bloom filters from build side to probe side via HashJoin
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const P::HashJoinPtr hash_join =
+        std::static_pointer_cast<const P::HashJoin>(node);
+    if (hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
+        hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin) {
+      const P::PhysicalPtr &producer_child = hash_join->right();
+      const P::PhysicalPtr &consumer_child = hash_join->left();
+      std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs;
+      for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) {
+        const E::AttributeReferencePtr probe_join_attribute =
+            hash_join->left_join_attributes()[i];
+        const E::AttributeReferencePtr build_join_attribute =
+            hash_join->right_join_attributes()[i];
+        join_attribute_pairs.emplace(build_join_attribute->id(),
+                                     probe_join_attribute);
+      }
+
+      auto &consumer_bloom_filters = consumers_[consumer_child];
+      for (const auto &info : producers_[producer_child]) {
+        const auto pair_it = join_attribute_pairs.find(info.attribute->id());
+        if (pair_it != join_attribute_pairs.end()) {
+          consumer_bloom_filters.emplace_back(info.source,
+                                              pair_it->second,
+                                              info.depth,
+                                              info.selectivity,
+                                              true,
+                                              info.attribute);
+        }
+      }
+    }
+  }
+
+  P::PhysicalPtr consumer_child = nullptr;
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    consumer_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
+  }
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    consumer_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
+  }
+
+  if (consumer_child != nullptr) {
+    // Decide attaches
+    auto &consumer_bloom_filters = consumers_[consumer_child];
+    if (cost_model_->estimateCardinality(consumer_child) > 10000000 &&
+        !consumer_bloom_filters.empty()) {
+      std::map<E::AttributeReferencePtr, const BloomFilterInfo*> filters;
+      for (const auto &info : consumer_bloom_filters) {
+        auto it = filters.find(info.attribute);
+        if (it == filters.end()) {
+          filters.emplace(info.attribute, &info);
+        } else {
+          if (BloomFilterInfo::isBetterThan(&info, it->second)) {
+            it->second = &info;
+          }
+        }
+      }
+
+      auto &probe_attaches = getBloomFilterConfig(node);
+      for (const auto &pair : filters) {
+        auto &build_attaches = getBloomFilterConfig(pair.second->source);
+        build_attaches.addBuildSideBloomFilter(
+            pair.second->source_attribute);
+        probe_attaches.addProbeSideBloomFilter(
+            pair.first,
+            pair.second->source_attribute,
+            pair.second->source);
+      }
+    }
+  }
+
+  for (const auto &child : node->children()) {
+    visitConsumer(child);
+  }
+}
+
+P::PhysicalPtr AttachBloomFilters::visitAndAttach(const physical::PhysicalPtr &node) {
+  std::vector<P::PhysicalPtr> new_children;
+  bool has_changed = false;
+  for (const auto &child : node->children()) {
+    P::PhysicalPtr new_child = visitAndAttach(child);
+    if (new_child != child) {
+      has_changed = true;
+    }
+    new_children.emplace_back(new_child);
+  }
+
+  if (node->getPhysicalType() == P::PhysicalType::kHashJoin) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::HashJoinPtr hash_join =
+          std::static_pointer_cast<const P::HashJoin>(node);
+      return P::HashJoin::Create(
+          new_children[0],
+          new_children[1],
+          hash_join->left_join_attributes(),
+          hash_join->right_join_attributes(),
+          hash_join->residual_predicate(),
+          hash_join->project_expressions(),
+          hash_join->join_type(),
+          attach_it->second);
+    }
+  }
+
+  if (node->getPhysicalType() == P::PhysicalType::kAggregate) {
+    const auto attach_it = attaches_.find(node);
+    if (attach_it != attaches_.end()) {
+//      for (const auto& item : attach_it->second.probe_side_bloom_filters) {
+//        std::cout << "Attach probe from " << item.builder
+//                  << " to " << node << "\n";
+//      }
+
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+      return P::Aggregate::Create(
+          aggregate->input(),
+          aggregate->grouping_expressions(),
+          aggregate->aggregate_expressions(),
+          aggregate->filter_predicate(),
+          attach_it->second);
+    }
+  }
+
+  if (has_changed) {
+    return node->copyWithNewChildren(new_children);
+  }
+
+  return node;
+}
+
+P::BloomFilterConfig& AttachBloomFilters::getBloomFilterConfig(const physical::PhysicalPtr &node) {
+  if (attaches_.find(node) == attaches_.end()) {
+    attaches_.emplace(node, node);
+  }
+  return attaches_[node];
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/rules/AttachBloomFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachBloomFilters.hpp b/query_optimizer/rules/AttachBloomFilters.hpp
new file mode 100644
index 0000000..e4437f7
--- /dev/null
+++ b/query_optimizer/rules/AttachBloomFilters.hpp
@@ -0,0 +1,118 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief TODO
+ */
+class AttachBloomFilters : public Rule<physical::Physical> {
+ public:
+  AttachBloomFilters() {}
+
+  ~AttachBloomFilters() override {}
+
+  std::string getName() const override {
+    return "AttachBloomFilters";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  struct BloomFilterInfo {
+    BloomFilterInfo(const physical::PhysicalPtr &source_in,
+                    const expressions::AttributeReferencePtr &attribute_in,
+                    const int depth_in,
+                    const double selectivity_in,
+                    const bool from_sibling_in,
+                    const expressions::AttributeReferencePtr &source_attribute_in = nullptr)
+        : source(source_in),
+          attribute(attribute_in),
+          depth(depth_in),
+          selectivity(selectivity_in),
+          from_sibling(from_sibling_in),
+          source_attribute(
+              source_attribute_in == nullptr
+                  ? attribute_in
+                  : source_attribute_in) {
+
+    }
+    static bool isBetterThan(const BloomFilterInfo *a,
+                             const BloomFilterInfo *b) {
+      if (a->selectivity == b->selectivity) {
+        return a->depth > b->depth;
+      } else {
+        return a->selectivity < b->selectivity;
+      }
+    }
+    physical::PhysicalPtr source;
+    expressions::AttributeReferencePtr attribute;
+    int depth;
+    double selectivity;
+    bool from_sibling;
+    expressions::AttributeReferencePtr source_attribute;
+  };
+
+  void visitProducer(const physical::PhysicalPtr &node, const int depth);
+
+  void visitConsumer(const physical::PhysicalPtr &node);
+
+  physical::PhysicalPtr visitAndAttach(const physical::PhysicalPtr &node);
+
+  physical::BloomFilterConfig &getBloomFilterConfig(const physical::PhysicalPtr &node);
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> producers_;
+  std::map<physical::PhysicalPtr, std::vector<BloomFilterInfo>> consumers_;
+  std::map<physical::PhysicalPtr, physical::BloomFilterConfig> attaches_;
+
+  DISALLOW_COPY_AND_ASSIGN(AttachBloomFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_ATTACH_BLOOM_FILTERS_HPP_ */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 1990174..a943ef7 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_subdirectory(tests)
 
 # Declare micro-libs:
+add_library(quickstep_queryoptimizer_rules_AttachBloomFilters AttachBloomFilters.cpp AttachBloomFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
@@ -35,6 +36,20 @@ add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp
 
 
 # Link dependencies:
+target_link_libraries(quickstep_queryoptimizer_rules_AttachBloomFilters
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
                       glog
                       quickstep_queryoptimizer_rules_Rule
@@ -126,6 +141,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOpti
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_DisjointTreeForest
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_queryoptimizer_rules_Rule
@@ -176,6 +192,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_rules ../../empty_src.cpp OptimizerRulesModule.hpp)
 target_link_libraries(quickstep_queryoptimizer_rules
+                      quickstep_queryoptimizer_rules_AttachBloomFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
index 9770606..cfbb5d1 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.cpp
@@ -31,6 +31,7 @@
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/DisjointTreeForest.hpp"
 
 #include "glog/logging.h"
 
@@ -72,6 +73,9 @@ P::PhysicalPtr StarSchemaHashJoinOrderOptimization::applyInternal(const P::Physi
     JoinGroupInfo *join_group = nullptr;
     if (parent_join_group == nullptr || !is_valid_cascading_hash_join) {
       new_join_group.reset(new JoinGroupInfo());
+      for (const auto &attr : input->getReferencedAttributes()) {
+        new_join_group->referenced_attributes.emplace(attr->id());
+      }
       join_group = new_join_group.get();
     } else {
       join_group = parent_join_group;
@@ -144,7 +148,10 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         i,
         tables[i],
         cost_model_->estimateCardinality(tables[i]),
-        cost_model_->estimateSelectivity(tables[i]));
+        cost_model_->estimateSelectivity(tables[i]),
+        CountSharedAttributes(join_group.referenced_attributes,
+                              tables[i]->getOutputAttributes()),
+        tables[i]->getPhysicalType() == physical::PhysicalType::kAggregate);
   }
 
   // Auxiliary mapping info.
@@ -161,9 +168,19 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
     }
   }
 
-  // Create a join graph where tables are vertices, and add an edge between vertices
-  // t1 and t2 for each join predicate t1.x = t2.y
-  std::vector<std::unordered_set<std::size_t>> join_graph(table_info_storage.size());
+  std::set<TableInfo*> remaining_tables;
+  for (auto &table_info : table_info_storage) {
+    remaining_tables.emplace(&table_info);
+  }
+
+  DisjointTreeForest<E::ExprId> join_attribute_forest;
+  for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
+    join_attribute_forest.makeSet(attr_id_pair.first);
+    join_attribute_forest.makeSet(attr_id_pair.second);
+    join_attribute_forest.merge(attr_id_pair.first, attr_id_pair.second);
+  }
+
+  std::map<std::size_t, std::map<std::size_t, E::ExprId>> join_attribute_groups;
   for (const auto &attr_id_pair : join_group.join_attribute_pairs) {
     DCHECK(attribute_id_to_table_info_index_map.find(attr_id_pair.first)
                != attribute_id_to_table_info_index_map.end());
@@ -176,128 +193,169 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
         attribute_id_to_table_info_index_map[attr_id_pair.second];
     DCHECK_NE(first_table_idx, second_table_idx);
 
-    table_info_storage[first_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.first, attr_id_pair.second);
-    table_info_storage[second_table_idx].join_attribute_pairs.emplace(
-        attr_id_pair.second, attr_id_pair.first);
-
-    join_graph[first_table_idx].emplace(second_table_idx);
-    join_graph[second_table_idx].emplace(first_table_idx);
-  }
-
-  std::set<TableInfo*, TableInfoPtrLessComparator> table_info_ordered_by_priority;
-  for (std::size_t i = 0; i < table_info_storage.size(); ++i) {
-    table_info_ordered_by_priority.emplace(&table_info_storage[i]);
+    DCHECK_EQ(join_attribute_forest.find(attr_id_pair.first),
+              join_attribute_forest.find(attr_id_pair.second));
+    const std::size_t attr_group_id = join_attribute_forest.find(attr_id_pair.first);
+    auto &attr_group = join_attribute_groups[attr_group_id];
+    attr_group.emplace(first_table_idx, attr_id_pair.first);
+    attr_group.emplace(second_table_idx, attr_id_pair.second);
   }
 
-  // Contruct hash join tree.
   while (true) {
-    TableInfo *first_table_info = *table_info_ordered_by_priority.begin();
-    table_info_ordered_by_priority.erase(
-        table_info_ordered_by_priority.begin());
-    const std::size_t first_table_info_id = first_table_info->table_info_id;
-
-    TableInfo *second_table_info = nullptr;
-    std::set<TableInfo*, TableInfoPtrLessComparator>::iterator second_table_info_it;
-    for (auto candidate_table_info_it = table_info_ordered_by_priority.begin();
-         candidate_table_info_it != table_info_ordered_by_priority.end();
-         ++candidate_table_info_it) {
-      TableInfo *candidate_table_info = *candidate_table_info_it;
-      const std::size_t candidate_table_info_id = candidate_table_info->table_info_id;
-
-      if (join_graph[first_table_info_id].find(candidate_table_info_id)
-              == join_graph[first_table_info_id].end() &&
-          join_graph[candidate_table_info_id].find(first_table_info_id)
-              == join_graph[candidate_table_info_id].end()) {
-        continue;
-      } else if (second_table_info == nullptr) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-      }
-
-      bool is_likely_many_to_many_join = false;
-      for (const auto join_attr_pair : first_table_info->join_attribute_pairs) {
-        if (candidate_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != candidate_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
-        }
-      }
-      for (const auto join_attr_pair : candidate_table_info->join_attribute_pairs) {
-        if (first_table_info->joined_attribute_set.find(join_attr_pair.second)
-                != first_table_info->joined_attribute_set.end()) {
-          is_likely_many_to_many_join = true;
-          break;
+    // TODO(jianqiao): design better data structure to improve efficiency here.
+    std::unique_ptr<JoinPair> best_join = nullptr;
+    for (TableInfo *probe_table_info : remaining_tables) {
+      for (TableInfo *build_table_info : remaining_tables) {
+        if (probe_table_info != build_table_info) {
+          std::vector<E::AttributeReferencePtr> build_attrs;
+          const std::size_t probe_table_id = probe_table_info->table_info_id;
+          const std::size_t build_table_id = build_table_info->table_info_id;
+          for (const auto &attr_group_pair : join_attribute_groups) {
+            const auto &attr_group = attr_group_pair.second;
+            auto probe_it = attr_group.find(probe_table_id);
+            auto build_it = attr_group.find(build_table_id);
+            if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+              build_attrs.emplace_back(
+                  attribute_id_to_reference_map.at(build_it->second));
+            }
+          }
+          if (!build_attrs.empty()
+              && build_table_info->table->impliesUniqueAttributes(build_attrs)) {
+            std::unique_ptr<JoinPair> new_join(
+                new JoinPair(probe_table_info, build_table_info));
+            if (best_join == nullptr || new_join->isBetterThan(*best_join)) {
+//              if (best_join != nullptr) {
+//                std::cerr << "(" << best_join->probe->estimated_selectivity
+//                          << ", " << best_join->probe->estimated_cardinality << ")"
+//                          << " -- "
+//                          << "(" << best_join->build->estimated_selectivity
+//                          << ", " << best_join->build->estimated_cardinality << ")"
+//                          << "\n";
+//                std::cerr << "REPLACED WITH\n";
+//              }
+//              std::cerr << "(" << new_join->probe->estimated_selectivity
+//                        << ", " << new_join->probe->estimated_cardinality << ")"
+//                        << " -- "
+//                        << "(" << new_join->build->estimated_selectivity
+//                        << ", " << new_join->build->estimated_cardinality << ")"
+//                        << "\n****\n";
+              best_join.reset(new_join.release());
+            }
+          }
         }
       }
-      if (!is_likely_many_to_many_join) {
-        second_table_info = candidate_table_info;
-        second_table_info_it = candidate_table_info_it;
-        break;
-      }
     }
-    DCHECK(second_table_info != nullptr);
-    table_info_ordered_by_priority.erase(second_table_info_it);
 
-    const P::PhysicalPtr &left_child = first_table_info->table;
-    const P::PhysicalPtr &right_child = second_table_info->table;
+    TableInfo *selected_probe_table_info = nullptr;
+    TableInfo *selected_build_table_info = nullptr;
+
+    if (best_join != nullptr) {
+      selected_probe_table_info = best_join->probe;
+      selected_build_table_info = best_join->build;
+    }
+
+    // TODO(jianqiao): Handle the case when there is no primary key-foreign key information available.
+    CHECK(selected_probe_table_info != nullptr);
+    CHECK(selected_build_table_info != nullptr);
+
+//    std::cerr << selected_probe_table_info->estimated_selectivity
+//              << " -- "
+//              << selected_build_table_info->estimated_selectivity
+//              << "\n";
+
+//    std::cerr << selected_probe_table_info->estimated_num_output_attributes
+//              << " -- "
+//              << selected_build_table_info->estimated_num_output_attributes
+//              << "\n";
+
+    remaining_tables.erase(selected_probe_table_info);
+    remaining_tables.erase(selected_build_table_info);
+
+    const P::PhysicalPtr &probe_child = selected_probe_table_info->table;
+    const P::PhysicalPtr &build_child = selected_build_table_info->table;
     std::vector<E::NamedExpressionPtr> output_attributes;
-    for (const E::AttributeReferencePtr &left_attr : left_child->getOutputAttributes()) {
-      output_attributes.emplace_back(left_attr);
+    for (const E::AttributeReferencePtr &probe_attr : probe_child->getOutputAttributes()) {
+      output_attributes.emplace_back(probe_attr);
     }
-    for (const E::AttributeReferencePtr &right_attr : right_child->getOutputAttributes()) {
-      output_attributes.emplace_back(right_attr);
+    for (const E::AttributeReferencePtr &build_attr : build_child->getOutputAttributes()) {
+      output_attributes.emplace_back(build_attr);
     }
 
-    std::vector<E::AttributeReferencePtr> left_join_attributes;
-    std::vector<E::AttributeReferencePtr> right_join_attributes;
-    std::unordered_set<expressions::ExprId> new_joined_attribute_set;
-    for (const auto &join_attr_pair : first_table_info->join_attribute_pairs) {
-      if (second_table_info->join_attribute_pairs.find(join_attr_pair.second)
-              != second_table_info->join_attribute_pairs.end()) {
-        left_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.first]);
-        right_join_attributes.emplace_back(
-            attribute_id_to_reference_map[join_attr_pair.second]);
-
-        new_joined_attribute_set.emplace(join_attr_pair.first);
-        new_joined_attribute_set.emplace(join_attr_pair.second);
+    std::vector<E::AttributeReferencePtr> probe_attributes;
+    std::vector<E::AttributeReferencePtr> build_attributes;
+    const std::size_t probe_table_id = selected_probe_table_info->table_info_id;
+    const std::size_t build_table_id = selected_build_table_info->table_info_id;
+    for (const auto &attr_group_pair : join_attribute_groups) {
+      const auto &attr_group = attr_group_pair.second;
+      auto probe_it = attr_group.find(probe_table_id);
+      auto build_it = attr_group.find(build_table_id);
+      if (probe_it != attr_group.end() && build_it != attr_group.end()) {
+        probe_attributes.emplace_back(
+            attribute_id_to_reference_map.at(probe_it->second));
+        build_attributes.emplace_back(
+            attribute_id_to_reference_map.at(build_it->second));
       }
     }
-    DCHECK_GE(left_join_attributes.size(), static_cast<std::size_t>(1));
 
-    if (table_info_ordered_by_priority.size() > 0) {
+    if (remaining_tables.size() > 0) {
       P::PhysicalPtr output =
-          P::HashJoin::Create(left_child,
-                              right_child,
-                              left_join_attributes,
-                              right_join_attributes,
+          P::HashJoin::Create(probe_child,
+                              build_child,
+                              probe_attributes,
+                              build_attributes,
                               nullptr,
                               output_attributes,
                               P::HashJoin::JoinType::kInnerJoin);
 
-      second_table_info->table = output;
+//      P::PhysicalPtr output;
+//      if (selected_build_table_info->estimated_num_output_attributes >= 4 &&
+//          selected_probe_table_info->estimated_num_output_attributes < 4) {
+//        output = P::HashJoin::Create(build_child,
+//                                     probe_child,
+//                                     build_attributes,
+//                                     probe_attributes,
+//                                     nullptr,
+//                                     output_attributes,
+//                                     P::HashJoin::JoinType::kInnerJoin);
+//      } else {
+//        output = P::HashJoin::Create(probe_child,
+//                                     build_child,
+//                                     probe_attributes,
+//                                     build_attributes,
+//                                     nullptr,
+//                                     output_attributes,
+//                                     P::HashJoin::JoinType::kInnerJoin);
+//      }
+
+      selected_probe_table_info->table = output;
 
       // TODO(jianqiao): Cache the estimated cardinality for each plan in cost
       // model to avoid duplicated estimation.
-      second_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
-
-      second_table_info->join_attribute_pairs.insert(first_table_info->join_attribute_pairs.begin(),
-                                                     first_table_info->join_attribute_pairs.end());
-      second_table_info->joined_attribute_set.insert(first_table_info->joined_attribute_set.begin(),
-                                                     first_table_info->joined_attribute_set.end());
-      second_table_info->joined_attribute_set.insert(new_joined_attribute_set.begin(),
-                                                     new_joined_attribute_set.end());
-      table_info_ordered_by_priority.emplace(second_table_info);
-
-      join_graph[second_table_info->table_info_id].insert(join_graph[first_table_info_id].begin(),
-                                                          join_graph[first_table_info_id].end());
-
+      selected_probe_table_info->estimated_cardinality = cost_model_->estimateCardinality(output);
+      selected_probe_table_info->estimated_selectivity = cost_model_->estimateSelectivity(output);
+
+      selected_probe_table_info->estimated_num_output_attributes =
+          CountSharedAttributes(join_group.referenced_attributes,
+                                output->getOutputAttributes());
+      selected_probe_table_info->is_aggregation = false;
+
+      remaining_tables.emplace(selected_probe_table_info);
+
+      // Update join attribute groups.
+      for (auto &attr_group_pair : join_attribute_groups) {
+        auto &attr_group = attr_group_pair.second;
+        auto build_it = attr_group.find(build_table_id);
+        if (build_it != attr_group.end()) {
+          const E::ExprId attr_id = build_it->second;
+          attr_group.erase(build_it);
+          attr_group.emplace(probe_table_id, attr_id);
+        }
+      }
     } else {
-      return P::HashJoin::Create(left_child,
-                                 right_child,
-                                 left_join_attributes,
-                                 right_join_attributes,
+      return P::HashJoin::Create(probe_child,
+                                 build_child,
+                                 probe_attributes,
+                                 build_attributes,
                                  residual_predicate,
                                  project_expressions,
                                  P::HashJoin::JoinType::kInnerJoin);
@@ -305,5 +363,18 @@ physical::PhysicalPtr StarSchemaHashJoinOrderOptimization::generatePlan(
   }
 }
 
+std::size_t StarSchemaHashJoinOrderOptimization::CountSharedAttributes(
+    const std::unordered_set<expressions::ExprId> &attr_set1,
+    const std::vector<expressions::AttributeReferencePtr> &attr_set2) {
+  std::size_t cnt = 0;
+  for (const auto &attr : attr_set2) {
+    if (attr_set1.find(attr->id()) != attr_set1.end()) {
+      ++cnt;
+    }
+  }
+  return cnt;
+}
+
+
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
index deddffd..33d95a5 100644
--- a/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
+++ b/query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp
@@ -62,6 +62,7 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief A group of tables to form a hash join tree.
    */
   struct JoinGroupInfo {
+    std::unordered_set<expressions::ExprId> referenced_attributes;
     std::vector<physical::PhysicalPtr> tables;
     std::vector<std::pair<expressions::ExprId, expressions::ExprId>> join_attribute_pairs;
   };
@@ -70,49 +71,84 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
    * @brief Auxiliary information of a table for the optimizer.
    */
   struct TableInfo {
-    TableInfo(const std::size_t in_table_info_id,
-              const physical::PhysicalPtr &in_table,
-              const std::size_t in_estimated_cardinality,
-              const double in_estimated_selectivity)
-        : table_info_id(in_table_info_id),
-          table(in_table),
-          estimated_cardinality(in_estimated_cardinality),
-          estimated_selectivity(in_estimated_selectivity) {
+    TableInfo(const std::size_t table_info_id_in,
+              const physical::PhysicalPtr &table_in,
+              const std::size_t estimated_cardinality_in,
+              const double estimated_selectivity_in,
+              const std::size_t estimated_num_output_attributes_in,
+              const bool is_aggregation_in)
+        : table_info_id(table_info_id_in),
+          table(table_in),
+          estimated_cardinality(estimated_cardinality_in),
+          estimated_selectivity(estimated_selectivity_in),
+          estimated_num_output_attributes(estimated_num_output_attributes_in),
+          is_aggregation(is_aggregation_in) {
     }
 
     const std::size_t table_info_id;
     physical::PhysicalPtr table;
     std::size_t estimated_cardinality;
     double estimated_selectivity;
-    std::unordered_multimap<expressions::ExprId, expressions::ExprId> join_attribute_pairs;
-    std::unordered_set<expressions::ExprId> joined_attribute_set;
+    std::size_t estimated_num_output_attributes;
+    bool is_aggregation;
   };
 
-  /**
-   * @brief Comparator that compares the join priorities between two tables.
-   */
-  struct TableInfoPtrLessComparator {
-    inline bool operator() (const TableInfo *lhs, const TableInfo *rhs) {
-      bool swapped = false;
-      if (lhs->estimated_cardinality > rhs->estimated_cardinality) {
-        std::swap(lhs, rhs);
-        swapped = true;
+  struct JoinPair {
+    JoinPair(TableInfo *probe_in, TableInfo *build_in)
+        : probe(probe_in), build(build_in) {
+    }
+
+    inline bool isBetterThan(const JoinPair &rhs) const {
+      const auto &lhs = *this;
+      const bool lhs_has_large_output =
+          lhs.build->estimated_num_output_attributes
+              + lhs.probe->estimated_num_output_attributes > 5;
+      const bool rhs_has_large_output =
+          rhs.build->estimated_num_output_attributes
+              + rhs.probe->estimated_num_output_attributes > 5;
+      if (lhs_has_large_output || rhs_has_large_output) {
+        if (lhs_has_large_output != rhs_has_large_output) {
+          return rhs_has_large_output;
+        }
+        double lhs_selectivity =
+            lhs.build->estimated_selectivity * lhs.probe->estimated_selectivity;
+        double rhs_selectivity =
+            rhs.build->estimated_selectivity * rhs.probe->estimated_selectivity;
+        if (lhs_selectivity != rhs_selectivity) {
+          return lhs_selectivity < rhs_selectivity;
+        }
       }
 
-      if (lhs->estimated_selectivity < rhs->estimated_selectivity) {
-        return !swapped;
-      } else if (lhs->estimated_cardinality < 1000u &&
-                 rhs->estimated_cardinality > 10000u &&
-                 lhs->estimated_selectivity < rhs->estimated_selectivity * 1.5) {
-        return !swapped;
-      } else if (lhs->estimated_selectivity > rhs->estimated_selectivity) {
-        return swapped;
-      } else if (lhs->estimated_cardinality != rhs->estimated_cardinality) {
-        return !swapped;
+      const bool lhs_has_small_build =
+          !lhs_has_large_output && lhs.build->estimated_cardinality < 0x100;
+      const bool rhs_has_small_build =
+          !rhs_has_large_output && rhs.build->estimated_cardinality < 0x100;
+      if (lhs_has_small_build != rhs_has_small_build) {
+        return lhs_has_small_build;
+      }
+
+      if (lhs.probe->is_aggregation != rhs.probe->is_aggregation) {
+        return lhs.probe->is_aggregation;
+      }
+
+      if (lhs.probe->estimated_cardinality != rhs.probe->estimated_cardinality) {
+        return lhs.probe->estimated_cardinality < rhs.probe->estimated_cardinality;
+      }
+      if (lhs.build->estimated_selectivity != rhs.build->estimated_selectivity) {
+        return lhs.build->estimated_selectivity < rhs.build->estimated_selectivity;
+      }
+      if (lhs.build->estimated_cardinality != rhs.build->estimated_cardinality) {
+        return lhs.build->estimated_cardinality < rhs.build->estimated_cardinality;
+      }
+      if (lhs.probe->table != rhs.probe->table) {
+        return lhs.probe->table < rhs.probe->table;
       } else {
-        return swapped ^ (lhs->table < rhs->table);
+        return lhs.build->table < rhs.build->table;
       }
     }
+
+    TableInfo *probe;
+    TableInfo *build;
   };
 
   physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input,
@@ -123,6 +159,10 @@ class StarSchemaHashJoinOrderOptimization : public Rule<physical::Physical> {
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions);
 
+  static std::size_t CountSharedAttributes(
+      const std::unordered_set<expressions::ExprId> &attr_set1,
+      const std::vector<expressions::AttributeReferencePtr> &attr_set2);
+
   std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
 
   DISALLOW_COPY_AND_ASSIGN(StarSchemaHashJoinOrderOptimization);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
index 815c13e..ac0adea 100644
--- a/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
+++ b/query_optimizer/tests/ExecutionHeuristics_unittest.cpp
@@ -70,7 +70,8 @@ class ExecutionHeuristicsTest : public ::testing::Test {
                                           probe_relation,
                                           std::move(build_attribute_ids),
                                           std::move(probe_attribute_ids),
-                                          join_hash_table_id);
+                                          join_hash_table_id,
+                                          build_relation->estimateTupleCardinality());
   }
 
   QueryPlan::DAGNodeIndex createDummyBuildHashOperator(QueryPlan *query_plan,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 4bcbcf6..c46ba2c 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -77,6 +77,10 @@ class AggregationOperator : public RelationalOperator {
 
   ~AggregationOperator() override {}
 
+  std::string getName() const override {
+    return "AggregationOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 464bbf8..952c7ac 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -93,6 +93,10 @@ class BuildHashOperator : public RelationalOperator {
 
   ~BuildHashOperator() override {}
 
+  std::string getName() const override {
+    return "BuildHashOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/CreateIndexOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index 18ca656..4e05448 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -69,6 +69,10 @@ class CreateIndexOperator : public RelationalOperator {
 
   ~CreateIndexOperator() override {}
 
+  std::string getName() const override {
+    return "CreateIndexOperator";
+  }
+
   /**
    * @note No WorkOrder generated for this operator.
    **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/CreateTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 6d91142..b7b707b 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -66,6 +66,10 @@ class CreateTableOperator : public RelationalOperator {
 
   ~CreateTableOperator() override {}
 
+  std::string getName() const override {
+    return "CreateTableOperator";
+  }
+
   /**
    * @note No WorkOrder generated for this operator.
    **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index 74da8c1..abfe4a9 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -81,6 +81,10 @@ class DeleteOperator : public RelationalOperator {
 
   ~DeleteOperator() override {}
 
+  std::string getName() const override {
+    return "DeleteOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/DestroyHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 181386f..ae65de5 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -58,6 +58,10 @@ class DestroyHashOperator : public RelationalOperator {
 
   ~DestroyHashOperator() override {}
 
+  std::string getName() const override {
+    return "DestroyHashOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/DropTableOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index 6c7fca3..f854b4f 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -74,6 +74,10 @@ class DropTableOperator : public RelationalOperator {
 
   ~DropTableOperator() override {}
 
+  std::string getName() const override {
+    return "DropTableOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 158a637..0dcfc9e 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -74,6 +74,10 @@ class FinalizeAggregationOperator : public RelationalOperator {
 
   ~FinalizeAggregationOperator() override {}
 
+  std::string getName() const override {
+    return "FinalizeAggregationOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 667df1e..16c0d82 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -59,6 +59,11 @@ using std::vector;
 
 namespace quickstep {
 
+DEFINE_int64(bloom_adapter_batch_size, 64,
+             "Number of tuples to probe in bulk in Bloom filter adapter.");
+DEFINE_bool(adapt_bloom_filters, true,
+            "Whether to adaptively adjust the ordering of bloom filters.");
+
 namespace {
 
 // Functor passed to HashTable::getAllFromValueAccessor() to collect matching
@@ -75,6 +80,11 @@ class MapBasedJoinedTupleCollector {
     joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
   }
 
+  inline void operator()(const tuple_id probe_tid,
+                         const TupleReference &build_tref) {
+    joined_tuples_[build_tref.block].emplace_back(build_tref.tuple, probe_tid);
+  }
+
   // Get a mutable pointer to the collected map of joined tuple ID pairs. The
   // key is inner block_id, values are vectors of joined tuple ID pairs with
   // tuple ID from the inner block on the left and the outer block on the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 5d3d7da..36b8929 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -157,6 +157,21 @@ class HashJoinOperator : public RelationalOperator {
 
   ~HashJoinOperator() override {}
 
+  std::string getName() const override {
+    switch (join_type_) {
+      case JoinType::kInnerJoin:
+        return "HashJoinOperator";
+      case JoinType::kLeftSemiJoin:
+        return "HashJoinOperator(LeftSemi)";
+      case JoinType::kLeftAntiJoin:
+        return "HashJoinOperator(LeftAnti)";
+      case JoinType::kLeftOuterJoin:
+        return "HashJoinOperator(LeftOuter)";
+      default: break;
+    }
+    LOG(FATAL) << "Unknown join type in HashJoinOperator::getName()";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,
@@ -283,8 +298,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
@@ -330,8 +346,9 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
@@ -411,8 +428,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),
@@ -458,8 +476,9 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(std::move(join_key_attributes)),
@@ -535,8 +554,9 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
-      StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      StorageManager *storage_manager,
+      const int op_index = -1)
+      : WorkOrder(query_id, op_index),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         join_key_attributes_(join_key_attributes),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/InsertOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp
index 78f5199..2c6aca7 100644
--- a/relational_operators/InsertOperator.hpp
+++ b/relational_operators/InsertOperator.hpp
@@ -73,6 +73,10 @@ class InsertOperator : public RelationalOperator {
 
   ~InsertOperator() override {}
 
+  std::string getName() const override {
+    return "InsertOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 992e76d..cf190fe 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -116,6 +116,10 @@ class NestedLoopsJoinOperator : public RelationalOperator {
 
   ~NestedLoopsJoinOperator() override {}
 
+  std::string getName() const override {
+    return "NestedLoopsJoinOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 116727b..65cd213 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -55,6 +55,13 @@ class RelationalOperator {
   virtual ~RelationalOperator() {}
 
   /**
+   * @brief Get the name of this relational operator.
+   *
+   * @return The name of this relational operator.
+   */
+  virtual std::string getName() const = 0;
+
+  /**
    * @brief Generate all the next WorkOrders for this RelationalOperator.
    *
    * @note If a RelationalOperator has blocking dependencies, it should not
@@ -226,6 +233,15 @@ class RelationalOperator {
     op_index_ = operator_index;
   }
 
+  /**
+   * @brief Get the index of this operator in the query plan DAG.
+   *
+   * @return The index of this operator in the query plan DAG.
+   */
+  std::size_t getOperatorIndex() const {
+    return op_index_;
+  }
+
  protected:
   /**
    * @brief Constructor

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/SampleOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp
index f8fe5f6..08f08c8 100644
--- a/relational_operators/SampleOperator.hpp
+++ b/relational_operators/SampleOperator.hpp
@@ -93,6 +93,10 @@ class SampleOperator : public RelationalOperator {
 
   ~SampleOperator() override {}
 
+  std::string getName() const override {
+    return "SampleOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/SaveBlocksOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 50032b6..ebc5ffc 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -64,6 +64,10 @@ class SaveBlocksOperator : public RelationalOperator {
 
   ~SaveBlocksOperator() override {}
 
+  std::string getName() const override {
+    return "SaveBlocksOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 0c10686..ee25886 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -189,6 +189,10 @@ class SelectOperator : public RelationalOperator {
 
   ~SelectOperator() override {}
 
+  std::string getName() const override {
+    return "SelectOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index 177836f..9b07ad6 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -129,6 +129,10 @@ class SortMergeRunOperator : public RelationalOperator {
    **/
   ~SortMergeRunOperator() {}
 
+  std::string getName() const override {
+    return "SortMergeRunOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index 96a3ce1..54c7feb 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -109,6 +109,10 @@ class SortRunGenerationOperator : public RelationalOperator {
 
   ~SortRunGenerationOperator() {}
 
+  std::string getName() const override {
+    return "SortRunGenerationOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 1b791a6..15e7052 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -76,6 +76,10 @@ class TableGeneratorOperator : public RelationalOperator {
 
   ~TableGeneratorOperator() override {}
 
+  std::string getName() const override {
+    return "TableGeneratorOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 1a62ded..6890d7d 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -134,6 +134,10 @@ class TextScanOperator : public RelationalOperator {
 
   ~TextScanOperator() override {}
 
+  std::string getName() const override {
+    return "TextScanOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index 4471a17..d021844 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -94,6 +94,10 @@ class UpdateOperator : public RelationalOperator {
 
   ~UpdateOperator() override {}
 
+  std::string getName() const override {
+    return "UpdateOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..0eb20da 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -75,6 +75,10 @@ class WindowAggregationOperator : public RelationalOperator {
 
   ~WindowAggregationOperator() override {}
 
+  std::string getName() const override {
+    return "WindowAggregationOperator";
+  }
+
   bool getAllWorkOrders(WorkOrdersContainer *container,
                         QueryContext *query_context,
                         StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index df195cc..4eb6b3a 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -299,16 +299,23 @@ class WorkOrder {
     return query_id_;
   }
 
+  inline const int getOperatorIndex() const {
+    return op_index_;
+  }
+
  protected:
   /**
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    **/
-  explicit WorkOrder(const std::size_t query_id)
-      : query_id_(query_id) {}
+  explicit WorkOrder(const std::size_t query_id,
+                     const int op_index = -1)
+      : query_id_(query_id),
+        op_index_(op_index) {}
 
   const std::size_t query_id_;
+  const int op_index_;
   // A vector of preferred NUMA node IDs where this workorder should be executed.
   // These node IDs typically indicate the NUMA node IDs of the input(s) of the
   // workorder. Derived classes should ensure that there are no duplicate entries

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 4878cf1..668164c 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -46,10 +46,13 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/BloomFilterAdapter.hpp"
 
 #include "glog/logging.h"
 
@@ -57,6 +60,8 @@ using std::unique_ptr;
 
 namespace quickstep {
 
+DECLARE_int64(bloom_adapter_batch_size);
+
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction*> &aggregate_functions,
@@ -64,12 +69,16 @@ AggregationOperationState::AggregationOperationState(
     std::vector<bool> &&is_distinct,
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
     const Predicate *predicate,
+    std::vector<const BloomFilter *> &&bloom_filters,
+    std::vector<attribute_id> &&bloom_filter_attribute_ids,
     const std::size_t estimated_num_entries,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
       predicate_(predicate),
+      bloom_filters_(std::move(bloom_filters)),
+      bloom_filter_attribute_ids_(std::move(bloom_filter_attribute_ids)),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
       is_distinct_(std::move(is_distinct)),
@@ -183,7 +192,8 @@ AggregationOperationState::AggregationOperationState(
 AggregationOperationState* AggregationOperationState::ReconstructFromProto(
     const serialization::AggregationOperationState &proto,
     const CatalogDatabaseLite &database,
-    StorageManager *storage_manager) {
+    StorageManager *storage_manager,
+    const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters) {
   DCHECK(ProtoIsValid(proto, database));
 
   // Rebuild contructor arguments from their representation in 'proto'.
@@ -232,12 +242,24 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
                                                database));
   }
 
+  std::vector<const BloomFilter*> bloom_filter_vector;
+  std::vector<attribute_id> bloom_filter_attribute_ids;
+  for (int i = 0; i < proto.bloom_filters_size(); ++i) {
+    // Add the pointer to the probe bloom filter within the list of probe bloom filters to use.
+    const auto bloom_filter_proto = proto.bloom_filters(i);
+    bloom_filter_vector.emplace_back(
+        bloom_filters[bloom_filter_proto.bloom_filter_id()].get());
+    bloom_filter_attribute_ids.emplace_back(bloom_filter_proto.attr_id());
+  }
+
   return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
                                        aggregate_functions,
                                        std::move(arguments),
                                        std::move(is_distinct),
                                        std::move(group_by_expressions),
                                        predicate.release(),
+                                       std::move(bloom_filter_vector),
+                                       std::move(bloom_filter_attribute_ids),
                                        proto.estimated_num_entries(),
                                        HashTableImplTypeFromProto(proto.hash_table_impl_type()),
                                        distinctify_hash_table_impl_types,
@@ -340,6 +362,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
   // tuples so that it can be reused across multiple aggregates (i.e. we only
   // pay the cost of evaluating the predicate once).
   std::unique_ptr<TupleIdSequence> reuse_matches;
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -358,7 +384,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
                                arguments_[agg_idx],
                                local_arguments_as_attributes,
                                {}, /* group_by */
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                nullptr /* reuse_group_by_vectors */);
@@ -369,7 +394,6 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
           block->aggregate(*handles_[agg_idx],
                            arguments_[agg_idx],
                            local_arguments_as_attributes,
-                           predicate_.get(),
                            &reuse_matches));
     }
   }
@@ -391,6 +415,72 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
   // GROUP BY expressions once).
   std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
 
+  if (predicate_) {
+    reuse_matches.reset(block->getMatchesForPredicate(predicate_.get()));
+  }
+
+  if (bloom_filters_.size() > 0) {
+    const std::size_t num_tuples = block->getNumTuples();
+//    std::cerr << "Before: " << num_tuples << " -- "
+//              << (reuse_matches ? reuse_matches->numTuples() : num_tuples)
+//              << "\n";
+    std::unique_ptr<ValueAccessor> accessor;
+    if (reuse_matches) {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor(reuse_matches.get()));
+    } else {
+      accessor.reset(
+          block->getTupleStorageSubBlock().createValueAccessor());
+    }
+    InvokeOnAnyValueAccessor(
+        accessor.get(),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      std::unique_ptr<TupleIdSequence> filtered(new TupleIdSequence(num_tuples));
+
+      std::vector<std::size_t> attr_size_vector;
+      attr_size_vector.reserve(bloom_filter_attribute_ids_.size());
+      for (const auto &attr : bloom_filter_attribute_ids_) {
+        auto val_and_size =
+            accessor->template getUntypedValueAndByteLengthAtAbsolutePosition<false>(0, attr);
+        attr_size_vector.emplace_back(val_and_size.second);
+      }
+
+      std::unique_ptr<BloomFilterAdapter> bloom_filter_adapter;
+      bloom_filter_adapter.reset(new BloomFilterAdapter(
+          bloom_filters_, bloom_filter_attribute_ids_, attr_size_vector));
+
+      std::uint32_t batch_size_try = FLAGS_bloom_adapter_batch_size;
+      std::uint32_t num_tuples_left = accessor->getNumTuples();
+      std::vector<tuple_id> batch(num_tuples_left);
+
+      do {
+        std::uint32_t batch_size =
+            batch_size_try < num_tuples_left ? batch_size_try : num_tuples_left;
+        for (std::size_t i = 0; i < batch_size; ++i) {
+          accessor->next();
+          batch.push_back(accessor->getCurrentPosition());
+        }
+
+        std::size_t num_hits =
+            bloom_filter_adapter->bulkProbe<true>(accessor, batch, batch_size);
+        for (std::size_t t = 0; t < num_hits; ++t){
+          filtered->set(batch[t], true);
+        }
+
+        batch.clear();
+        num_tuples_left -= batch_size;
+        batch_size_try = batch_size * 2;
+      } while (num_tuples_left > 0);
+
+      if (reuse_matches) {
+        reuse_matches->intersectWith(*filtered);
+      } else {
+        reuse_matches.reset(filtered.release());
+      }
+    });
+//    std::cerr << "After: " << reuse_matches->numTuples() << "\n";
+  }
+
   for (std::size_t agg_idx = 0;
        agg_idx < handles_.size();
        ++agg_idx) {
@@ -402,7 +492,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
                                arguments_[agg_idx],
                                nullptr, /* arguments_as_attributes */
                                group_by_list_,
-                               predicate_.get(),
                                distinctify_hashtables_[agg_idx].get(),
                                &reuse_matches,
                                &reuse_group_by_vectors);
@@ -416,7 +505,6 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
       block->aggregateGroupBy(*handles_[agg_idx],
                               arguments_[agg_idx],
                               group_by_list_,
-                              predicate_.get(),
                               agg_hash_table,
                               &reuse_matches,
                               &reuse_group_by_vectors);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0199749..5db7325 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -33,6 +33,7 @@
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "utility/BloomFilter.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -108,6 +109,8 @@ class AggregationOperationState {
                             std::vector<bool> &&is_distinct,
                             std::vector<std::unique_ptr<const Scalar>> &&group_by,
                             const Predicate *predicate,
+                            std::vector<const BloomFilter *> &&bloom_filters,
+                            std::vector<attribute_id> &&bloom_filter_attribute_ids,
                             const std::size_t estimated_num_entries,
                             const HashTableImplType hash_table_impl_type,
                             const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
@@ -131,7 +134,8 @@ class AggregationOperationState {
   static AggregationOperationState* ReconstructFromProto(
       const serialization::AggregationOperationState &proto,
       const CatalogDatabaseLite &database,
-      StorageManager *storage_manager);
+      StorageManager *storage_manager,
+      const std::vector<std::unique_ptr<BloomFilter>> &bloom_filters);
 
   /**
    * @brief Check whether a serialization::AggregationOperationState is
@@ -181,6 +185,10 @@ class AggregationOperationState {
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
   std::unique_ptr<const Predicate> predicate_;
+
+  std::vector<const BloomFilter*> bloom_filters_;
+  std::vector<attribute_id> bloom_filter_attribute_ids_;
+
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
   // Each individual aggregate in this operation has an AggregationHandle and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index bf78e3a..165148e 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,10 @@ message AggregationOperationState {
 
   // Each DISTINCT aggregation has its distinctify hash table impl type.
   repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+  message BloomFilter {
+    required uint32 bloom_filter_id = 1;
+    required uint32 attr_id = 2;
+  }
+  repeated BloomFilter bloom_filters = 8;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/BasicColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/BasicColumnStoreValueAccessor.hpp b/storage/BasicColumnStoreValueAccessor.hpp
index 759e187..7907fd5 100644
--- a/storage/BasicColumnStoreValueAccessor.hpp
+++ b/storage/BasicColumnStoreValueAccessor.hpp
@@ -18,6 +18,8 @@
 #ifndef QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
 #define QUICKSTEP_STORAGE_BASIC_COLUMN_STORE_VALUE_ACCESSOR_HPP_
 
+#include <cstddef>
+#include <utility>
 #include <vector>
 
 #include "catalog/CatalogRelationSchema.hpp"
@@ -43,7 +45,8 @@ class BasicColumnStoreValueAccessorHelper {
       : relation_(relation),
         num_tuples_(num_tuples),
         column_stripes_(column_stripes),
-        column_null_bitmaps_(column_null_bitmaps) {
+        column_null_bitmaps_(column_null_bitmaps),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()) {
   }
 
   inline tuple_id numPackedTuples() const {
@@ -61,9 +64,23 @@ class BasicColumnStoreValueAccessorHelper {
       return nullptr;
     }
 
-    // TODO(chasseur): Consider cacheing the byte lengths of attributes.
-    return static_cast<const char*>(column_stripes_[attr])
-           + (tuple * relation_.getAttributeById(attr)->getType().maximumByteLength());
+    return static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_max_lengths_[attr]);
+  }
+
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    DEBUG_ASSERT(tuple < num_tuples_);
+    DEBUG_ASSERT(relation_.hasAttributeWithId(attr));
+    if (check_null
+        && (!column_null_bitmaps_.elementIsNull(attr))
+        && column_null_bitmaps_[attr].getBit(tuple)) {
+      return std::make_pair(nullptr, 0);
+    }
+
+    const std::size_t attr_length = attr_max_lengths_[attr];
+    return std::make_pair(static_cast<const char*>(column_stripes_[attr]) + (tuple * attr_length),
+                          attr_length);
   }
 
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
@@ -80,6 +97,7 @@ class BasicColumnStoreValueAccessorHelper {
   const tuple_id num_tuples_;
   const std::vector<void*> &column_stripes_;
   const PtrVector<BitVector<false>, true> &column_null_bitmaps_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   DISALLOW_COPY_AND_ASSIGN(BasicColumnStoreValueAccessorHelper);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/BloomFilterIndexSubBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.cpp b/storage/BloomFilterIndexSubBlock.cpp
index e806217..a40f69f 100644
--- a/storage/BloomFilterIndexSubBlock.cpp
+++ b/storage/BloomFilterIndexSubBlock.cpp
@@ -55,7 +55,6 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
                     sub_block_memory_size),
       is_initialized_(false),
       is_consistent_(false),
-      random_seed_(kBloomFilterSeed),
       bit_array_size_in_bytes_(description.GetExtension(
                                    BloomFilterIndexSubBlockDescription::bloom_filter_size)) {
   CHECK(DescriptionIsValid(relation_, description_))
@@ -74,8 +73,7 @@ BloomFilterIndexSubBlock::BloomFilterIndexSubBlock(const TupleStorageSubBlock &t
   const std::uint32_t salt_count = description.GetExtension(BloomFilterIndexSubBlockDescription::number_of_hashes);
 
   // Initialize the bloom_filter_ data structure to operate on bit_array.
-  bloom_filter_.reset(new BloomFilter(random_seed_,
-                                      salt_count,
+  bloom_filter_.reset(new BloomFilter(salt_count,
                                       bit_array_size_in_bytes_,
                                       bit_array_.get(),
                                       is_bloom_filter_initialized));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/BloomFilterIndexSubBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/BloomFilterIndexSubBlock.hpp b/storage/BloomFilterIndexSubBlock.hpp
index 4925673..8c81156 100644
--- a/storage/BloomFilterIndexSubBlock.hpp
+++ b/storage/BloomFilterIndexSubBlock.hpp
@@ -65,11 +65,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
     kSelectivityNone
   };
 
-  /**
-   * @brief A random seed to initialize the bloom filter hash functions.
-   **/
-  static const std::uint64_t kBloomFilterSeed = 0xA5A5A5A55A5A5A5AULL;
-
   BloomFilterIndexSubBlock(const TupleStorageSubBlock &tuple_store,
                            const IndexSubBlockDescription &description,
                            const bool new_block,
@@ -179,7 +174,6 @@ class BloomFilterIndexSubBlock : public IndexSubBlock {
  private:
   bool is_initialized_;
   bool is_consistent_;
-  const std::uint64_t random_seed_;
   const std::uint64_t bit_array_size_in_bytes_;
   std::vector<attribute_id> indexed_attribute_ids_;
   std::unique_ptr<unsigned char> bit_array_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..578e4e9 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -677,6 +677,8 @@ target_link_libraries(quickstep_storage_HashTable
                       quickstep_types_Type
                       quickstep_types_TypedValue
                       quickstep_utility_BloomFilter
+                      quickstep_utility_BloomFilterAdapter
+                      quickstep_utility_EventProfiler
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableBase

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/CompressedColumnStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedColumnStoreValueAccessor.hpp b/storage/CompressedColumnStoreValueAccessor.hpp
index 64eb315..984dea3 100644
--- a/storage/CompressedColumnStoreValueAccessor.hpp
+++ b/storage/CompressedColumnStoreValueAccessor.hpp
@@ -52,6 +52,7 @@ class CompressedColumnStoreValueAccessorHelper {
       const PtrVector<BitVector<false>, true> &uncompressed_column_null_bitmaps)
       : relation_(relation),
         num_tuples_(num_tuples),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
         compression_info_(compression_info),
         dictionary_coded_attributes_(dictionary_coded_attributes),
         truncated_attributes_(truncated_attributes),
@@ -84,6 +85,26 @@ class CompressedColumnStoreValueAccessorHelper {
     }
   }
 
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    if (dictionary_coded_attributes_[attr]) {
+      return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+          getCode(tuple, attr));
+    } else if (truncated_attributes_[attr]) {
+      if (truncated_attribute_is_int_[attr]) {
+        int_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+      } else {
+        long_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+      }
+    } else {
+      return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+                            attr_max_lengths_[attr]);
+    }
+  }
+
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
                                            const attribute_id attr) const {
     if (dictionary_coded_attributes_[attr]) {
@@ -138,6 +159,7 @@ class CompressedColumnStoreValueAccessorHelper {
   const CatalogRelationSchema &relation_;
 
   const tuple_id num_tuples_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   const CompressedBlockInfo &compression_info_;
   const std::vector<bool> &dictionary_coded_attributes_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2242550d/storage/CompressedPackedRowStoreValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/storage/CompressedPackedRowStoreValueAccessor.hpp b/storage/CompressedPackedRowStoreValueAccessor.hpp
index 024b0ec..7058aec 100644
--- a/storage/CompressedPackedRowStoreValueAccessor.hpp
+++ b/storage/CompressedPackedRowStoreValueAccessor.hpp
@@ -58,6 +58,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
         num_tuples_(num_tuples),
         tuple_length_bytes_(tuple_length_bytes),
         attribute_offsets_(attribute_offsets),
+        attr_max_lengths_(relation.getMaximumAttributeByteLengths()),
         compression_info_(compression_info),
         dictionary_coded_attributes_(dictionary_coded_attributes),
         truncated_attributes_(truncated_attributes),
@@ -92,6 +93,26 @@ class CompressedPackedRowStoreValueAccessorHelper {
     }
   }
 
+  template <bool check_null>
+  inline std::pair<const void*, std::size_t> getAttributeValueAndByteLength(const tuple_id tuple,
+                                                                            const attribute_id attr) const {
+    if (dictionary_coded_attributes_[attr]) {
+      return dictionaries_.atUnchecked(attr).getUntypedValueAndByteLengthForCode<check_null>(
+          getCode(tuple, attr));
+    } else if (truncated_attributes_[attr]) {
+      if (truncated_attribute_is_int_[attr]) {
+        int_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&int_buffer_, sizeof(int_buffer_));
+      } else {
+        long_buffer_ = getCode(tuple, attr);
+        return std::make_pair(&long_buffer_, sizeof(long_buffer_));
+      }
+    } else {
+      return std::make_pair(getAttributePtr<check_null>(tuple, attr),
+                            attr_max_lengths_[attr]);
+    }
+  }
+
   inline TypedValue getAttributeValueTyped(const tuple_id tuple,
                                            const attribute_id attr) const {
     if (dictionary_coded_attributes_[attr]) {
@@ -150,6 +171,7 @@ class CompressedPackedRowStoreValueAccessorHelper {
   const tuple_id num_tuples_;
   const std::size_t tuple_length_bytes_;
   const std::vector<std::size_t> &attribute_offsets_;
+  const std::vector<std::size_t> &attr_max_lengths_;
 
   const CompressedBlockInfo &compression_info_;
   const std::vector<bool> &dictionary_coded_attributes_;



Mime
View raw message