quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [1/3] incubator-quickstep git commit: All in one.
Date Thu, 20 Apr 2017 03:39:33 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/improve-tpch-q01 [created] ddd077fe5


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/query_optimizer/rules/ExtractCommonSubexpression.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.cpp b/query_optimizer/rules/ExtractCommonSubexpression.cpp
new file mode 100644
index 0000000..ad2aec1
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.cpp
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ExtractCommonSubexpression.hpp"
+
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "utility/HashError.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+ExtractCommonSubexpression::ExtractCommonSubexpression(
+    OptimizerContext *optimizer_context)
+    : optimizer_context_(optimizer_context) {
+  const std::vector<E::ExpressionType> whitelist = {
+      E::ExpressionType::kAlias,
+      E::ExpressionType::kAttributeReference,
+      E::ExpressionType::kAggregateFunction,
+      E::ExpressionType::kBinaryExpression,
+      E::ExpressionType::kCast,
+      E::ExpressionType::kCommonSubexpression,
+      E::ExpressionType::kScalarLiteral,
+      E::ExpressionType::kUnaryExpression
+  };
+
+  for (const auto &expr_type : whitelist) {
+    homogeneous_whitelist_.emplace(static_cast<int>(expr_type));
+  }
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  return applyInternal(input);
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::applyInternal(
+    const P::PhysicalPtr &input) {
+  std::vector<P::PhysicalPtr> new_children;
+  for (const auto &child : input->children()) {
+    new_children.emplace_back(applyInternal(child));
+  }
+
+  const P::PhysicalPtr node =
+      new_children == input->children()
+          ? input
+          : input->copyWithNewChildren(new_children);
+
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr selection =
+          std::static_pointer_cast<const P::Selection>(node);
+
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(selection->project_expressions())));
+
+      if (new_expressions != selection->project_expressions()) {
+        return P::Selection::Create(selection->input(),
+                                    new_expressions,
+                                    selection->filter_predicate());
+      }
+      break;
+    }
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+      std::vector<E::ExpressionPtr> expressions =
+          UpCast(aggregate->aggregate_expressions());
+      for (const auto &expr : aggregate->grouping_expressions()) {
+        expressions.emplace_back(expr);
+      }
+
+      const std::vector<E::ExpressionPtr> new_expressions =
+          transformExpressions(expressions);
+
+      if (new_expressions != expressions) {
+        std::vector<E::AliasPtr> new_aggregate_expressions;
+        std::vector<E::NamedExpressionPtr> new_grouping_expressions;
+        const std::size_t num_aggrs = aggregate->aggregate_expressions().size();
+
+        for (std::size_t i = 0; i < num_aggrs; ++i) {
+          DCHECK(E::SomeAlias::Matches(new_expressions[i]));
+          new_aggregate_expressions.emplace_back(
+              std::static_pointer_cast<const E::Alias>(new_expressions[i]));
+        }
+        for (std::size_t i = num_aggrs; i < new_expressions.size(); ++i) {
+          DCHECK(E::SomeNamedExpression::Matches(new_expressions[i]));
+          new_grouping_expressions.emplace_back(
+              std::static_pointer_cast<const E::NamedExpression>(new_expressions[i]));
+        }
+        return P::Aggregate::Create(aggregate->input(),
+                                    new_grouping_expressions,
+                                    new_aggregate_expressions,
+                                    aggregate->filter_predicate());
+      }
+      break;
+    }
+    default:
+      break;
+  }
+
+  return node;
+}
+
+std::vector<E::ExpressionPtr> ExtractCommonSubexpression::transformExpressions(
+    const std::vector<E::ExpressionPtr> &expressions) {
+  ScalarCounter counter;
+  ScalarHashable hashable;
+  for (const auto &expr : expressions) {
+    visitAndCount(expr, &counter, &hashable);
+  }
+
+  ScalarMap substitution_map;
+  std::vector<E::ExpressionPtr> new_expressions;
+  for (const auto &expr : expressions) {
+    new_expressions.emplace_back(
+        visitAndTransform(expr, 1, counter, hashable, &substitution_map));
+  }
+  return new_expressions;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::transformExpression(
+    const E::ExpressionPtr &expression) {
+  return transformExpressions({expression}).front();
+}
+
+bool ExtractCommonSubexpression::visitAndCount(
+    const E::ExpressionPtr &expression,
+    ScalarCounter *counter,
+    ScalarHashable *hashable) {
+  bool children_hashable = true;
+
+  const auto homogeneous_whitelist_it =
+      homogeneous_whitelist_.find(static_cast<int>(expression->getExpressionType()));
+  if (homogeneous_whitelist_it != homogeneous_whitelist_.end()) {
+    for (const auto &child : expression->children()) {
+      children_hashable &= visitAndCount(child, counter, hashable);
+    }
+  }
+
+  E::ScalarPtr scalar;
+  if (children_hashable &&
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)) {
+    try {
+      ++(*counter)[scalar];
+    } catch (const HashNotSupported &e) {
+      return false;
+    }
+    hashable->emplace(scalar);
+    return true;
+  }
+  return false;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::visitAndTransform(
+    const E::ExpressionPtr &expression,
+    const std::size_t max_reference_count,
+    const ScalarCounter &counter,
+    const ScalarHashable &hashable,
+    ScalarMap *substitution_map) {
+  // TODO: figure out whether it is beneficial.
+  if (expression->getExpressionType() == E::ExpressionType::kScalarLiteral ||
+      expression->getExpressionType() == E::ExpressionType::kAttributeReference) {
+    return expression;
+  }
+
+  E::ScalarPtr scalar;
+  const bool is_hashable =
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)
+          && hashable.find(scalar) != hashable.end();
+
+  std::size_t new_max_reference_count;
+  if (is_hashable) {
+    // CommonSubexpression node already generated.
+    const auto substitution_map_it = substitution_map->find(scalar);
+    if (substitution_map_it != substitution_map->end()) {
+      return substitution_map_it->second;
+    }
+
+    const auto counter_it = counter.find(scalar);
+    DCHECK(counter_it != counter.end());
+    DCHECK_LE(max_reference_count, counter_it->second);
+    new_max_reference_count = counter_it->second;
+  } else {
+    new_max_reference_count = max_reference_count;
+  }
+
+  std::vector<E::ExpressionPtr> new_children;
+  const auto homogeneous_whitelist_it =
+      homogeneous_whitelist_.find(static_cast<int>(expression->getExpressionType()));
+  if (homogeneous_whitelist_it == homogeneous_whitelist_.end()) {
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(transformExpression(child));
+    }
+  } else {
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(
+          visitAndTransform(child,
+                            new_max_reference_count,
+                            counter,
+                            hashable,
+                            substitution_map));
+    }
+  }
+
+  E::ExpressionPtr output;
+  if (new_children == expression->children()) {
+    output = expression;
+  } else {
+    output = std::static_pointer_cast<const E::Scalar>(
+        expression->copyWithNewChildren(new_children));
+  }
+
+  if (is_hashable && new_max_reference_count > max_reference_count) {
+    DCHECK(E::SomeScalar::Matches(output));
+    const E::CommonSubexpressionPtr common_subexpression =
+        E::CommonSubexpression::Create(
+            optimizer_context_->nextExprId(),
+            std::static_pointer_cast<const E::Scalar>(output));
+    substitution_map->emplace(scalar, common_subexpression);
+    output = common_subexpression;
+  }
+
+  return output;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/query_optimizer/rules/ExtractCommonSubexpression.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.hpp b/query_optimizer/rules/ExtractCommonSubexpression.hpp
new file mode 100644
index 0000000..0b63b7e
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.hpp
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class ExtractCommonSubexpression : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  ExtractCommonSubexpression(OptimizerContext *optimizer_context);
+
+  ~ExtractCommonSubexpression() override {}
+
+  std::string getName() const override {
+    return "ExtractCommonSubexpression";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input);
+
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  using ScalarCounter =
+      std::unordered_map<expressions::ScalarPtr, std::size_t, ScalarHash, ScalarEqual>;
+
+  using ScalarMap =
+      std::unordered_map<expressions::ScalarPtr,
+                         expressions::CommonSubexpressionPtr,
+                         ScalarHash,
+                         ScalarEqual>;
+
+  using ScalarHashable = std::unordered_set<expressions::ScalarPtr>;
+
+  std::vector<expressions::ExpressionPtr> transformExpressions(
+      const std::vector<expressions::ExpressionPtr> &expressions);
+
+  expressions::ExpressionPtr transformExpression(
+      const expressions::ExpressionPtr &expression);
+
+  bool visitAndCount(
+      const expressions::ExpressionPtr &expression,
+      ScalarCounter *counter,
+      ScalarHashable *hashable);
+
+  expressions::ExpressionPtr visitAndTransform(
+      const expressions::ExpressionPtr &expression,
+      const std::size_t max_reference_count,
+      const ScalarCounter &counter,
+      const ScalarHashable &hashable,
+      ScalarMap *substitution_map);
+
+  template <typename ScalarSubclassT>
+  static std::vector<expressions::ExpressionPtr> UpCast(
+      const std::vector<std::shared_ptr<const ScalarSubclassT>> &expressions) {
+    std::vector<expressions::ExpressionPtr> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(expr);
+    }
+    return output;
+  }
+
+  template <typename ScalarSubclassT>
+  static std::vector<std::shared_ptr<const ScalarSubclassT>> DownCast(
+      const std::vector<expressions::ExpressionPtr> &expressions) {
+    std::vector<std::shared_ptr<const ScalarSubclassT>> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(std::static_pointer_cast<const ScalarSubclassT>(expr));
+    }
+    return output;
+  }
+
+  OptimizerContext *optimizer_context_;
+  std::unordered_set<int> homogeneous_whitelist_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExtractCommonSubexpression);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/query_optimizer/rules/ReuseAggregateExpressions.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp b/query_optimizer/rules/ReuseAggregateExpressions.cpp
new file mode 100644
index 0000000..7151fdf
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ReuseAggregateExpressions.hpp"
+
+#include <cstddef>
+#include <list>
+#include <map>
+#include <unordered_map>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/BinaryExpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+#include "utility/HashError.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+void ReuseAggregateExpressions::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
+    const P::PhysicalPtr &input) {
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate)) {
+    return input;
+  }
+
+  const std::vector<E::AliasPtr> &agg_exprs = aggregate->aggregate_expressions();
+
+  std::unordered_map<E::ScalarPtr,
+                     std::map<AggregationID, std::vector<std::size_t>>,
+                     ScalarHash, ScalarEqual> agg_expr_info;
+
+  std::list<std::size_t> count_star_info;
+
+  for (std::size_t i = 0; i < agg_exprs.size(); ++i) {
+    DCHECK(agg_exprs[i]->expression()->getExpressionType()
+               == E::ExpressionType::kAggregateFunction);
+    const E::AggregateFunctionPtr agg_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(
+            agg_exprs[i]->expression());
+
+    // Skip DISTINCT aggregations.
+    if (agg_expr->is_distinct()) {
+      continue;
+    }
+
+    const AggregationID agg_id = agg_expr->getAggregate().getAggregationID();
+    const std::vector<E::ScalarPtr> &arguments = agg_expr->getArguments();
+
+    // Currently we only consider aggregate functions with 0 or 1 argument.
+    if (agg_id == AggregationID::kCount) {
+      if (arguments.empty()) {
+        count_star_info.emplace_front(i);
+        continue;
+      } else if (!arguments.front()->getValueType().isNullable()) {
+        count_star_info.emplace_back(i);
+        continue;
+      }
+    }
+    if (arguments.size() == 1) {
+      try {
+        agg_expr_info[arguments.front()][agg_id].emplace_back(i);
+      } catch (const HashNotSupported &e) {
+        continue;
+      }
+    }
+  }
+
+  std::vector<std::unique_ptr<AggregateReference>> agg_refs(agg_exprs.size());
+
+  constexpr std::size_t kInvalidRef = static_cast<std::size_t>(-1);
+  std::size_t count_star_ref;
+
+  if (count_star_info.empty()) {
+    count_star_ref = kInvalidRef;
+  } else {
+    auto it = count_star_info.begin();
+    count_star_ref = *it;
+
+    for (++it; it != count_star_info.end(); ++it) {
+      agg_refs[*it].reset(new AggregateReference(count_star_ref));
+    }
+  }
+
+  for (const auto &it : agg_expr_info) {
+    const auto &ref_map = it.second;
+
+    // First, check whether AVG can be reduced to SUM/COUNT.
+    bool is_avg_processed = false;
+
+    const auto avg_it = ref_map.find(AggregationID::kAvg);
+    if (avg_it != ref_map.end()) {
+      std::size_t count_ref = kInvalidRef;
+
+      if (it.first->getValueType().isNullable()) {
+        const auto count_it = ref_map.find(AggregationID::kCount);
+        if (count_it != ref_map.end()) {
+          DCHECK(!count_it->second.empty());
+          count_ref = count_it->second.front();
+        }
+      } else {
+        count_ref = count_star_ref;
+      }
+
+      if (count_ref != kInvalidRef) {
+        const auto sum_it = ref_map.find(AggregationID::kSum);
+        const std::size_t sum_ref =
+            sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front();
+
+        for (const std::size_t idx : avg_it->second) {
+          agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref));
+        }
+        is_avg_processed = true;
+      }
+    }
+
+    // Then, eliminate duplicate aggregate expressions.
+    for (const auto &ref_it : ref_map) {
+      if (ref_it.first == AggregationID::kAvg && is_avg_processed) {
+        continue;
+      }
+
+      const auto &indices = ref_it.second;
+      DCHECK(!indices.empty());
+      const std::size_t ref = indices.front();
+      for (std::size_t i = 1; i < indices.size(); ++i) {
+        agg_refs[indices[i]].reset(new AggregateReference(ref));
+      }
+    }
+  }
+
+  bool need_transform = false;
+  for (const auto &agg_ref : agg_refs) {
+    if (agg_ref != nullptr) {
+      need_transform = true;
+      break;
+    }
+  }
+
+  if (!need_transform) {
+    return input;
+  }
+
+  const std::vector<E::AttributeReferencePtr> agg_attrs = E::ToRefVector(agg_exprs);
+
+  std::vector<E::AliasPtr> new_agg_exprs;
+  std::vector<E::NamedExpressionPtr> new_select_exprs;
+
+  for (const auto &grouping_expr : aggregate->grouping_expressions()) {
+    new_select_exprs.emplace_back(E::ToRef(grouping_expr));
+  }
+
+  for (std::size_t i = 0; i < agg_refs.size(); ++i) {
+    const auto &agg_ref = agg_refs[i];
+    const E::AliasPtr &agg_expr = agg_exprs[i];
+
+    if (agg_ref == nullptr) {
+      new_agg_exprs.emplace_back(agg_expr);
+      new_select_exprs.emplace_back(
+          E::AttributeReference::Create(agg_expr->id(),
+                                        agg_expr->attribute_name(),
+                                        agg_expr->attribute_alias(),
+                                        agg_expr->relation_name(),
+                                        agg_expr->getValueType(),
+                                        E::AttributeReferenceScope::kLocal));
+    } else {
+      switch (agg_ref->kind) {
+        case AggregateReference::kDirect: {
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               agg_attrs[agg_ref->first_ref],
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias()));
+          break;
+        }
+        case AggregateReference::kAvg: {
+          E::AttributeReferencePtr sum_attr;
+          if (agg_ref->first_ref == kInvalidRef) {
+            const E::AggregateFunctionPtr avg_expr =
+                std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+            const AggregateFunction &sum_func =
+                AggregateFunctionFactory::Get(AggregationID::kSum);
+            const E::AggregateFunctionPtr sum_expr =
+                E::AggregateFunction::Create(sum_func,
+                                             avg_expr->getArguments(),
+                                             avg_expr->is_vector_aggregate(),
+                                             avg_expr->is_distinct());
+            new_agg_exprs.emplace_back(
+                E::Alias::Create(optimizer_context_->nextExprId(),
+                                 sum_expr,
+                                 agg_expr->attribute_name(),
+                                 agg_expr->attribute_alias()));
+
+            sum_attr = E::ToRef(new_agg_exprs.back());
+          } else {
+            sum_attr = agg_attrs[agg_ref->first_ref];
+          }
+
+          const BinaryOperation &divide_op =
+              BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide);
+          const E::BinaryExpressionPtr avg_expr =
+              E::BinaryExpression::Create(divide_op,
+                                          sum_attr,
+                                          agg_attrs[agg_ref->second_ref]);
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               avg_expr,
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias()));
+        }
+      }
+    }
+  }
+
+  const P::AggregatePtr new_aggregate =
+      P::Aggregate::Create(aggregate->input(),
+                           aggregate->grouping_expressions(),
+                           new_agg_exprs,
+                           aggregate->filter_predicate());
+
+  return P::Selection::Create(new_aggregate, new_select_exprs, nullptr);
+}
+
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/query_optimizer/rules/ReuseAggregateExpressions.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp b/query_optimizer/rules/ReuseAggregateExpressions.hpp
new file mode 100644
index 0000000..5b78e46
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> {
+ public:
+  ReuseAggregateExpressions(OptimizerContext *optimizer_context)
+      : optimizer_context_(optimizer_context) {}
+
+  std::string getName() const override {
+    return "ReuseAggregateExpressions";
+  }
+
+ protected:
+  void init(const physical::PhysicalPtr &input) override;
+
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  struct AggregateReference {
+    enum Kind {
+      kDirect = 0,
+      kAvg
+    };
+
+    AggregateReference(const std::size_t ref)
+        : kind(kDirect), first_ref(ref), second_ref(0) {}
+
+    AggregateReference(const std::size_t sum_ref, const std::size_t count_ref)
+        : kind(kAvg), first_ref(sum_ref), second_ref(count_ref) {}
+
+    const Kind kind;
+    const std::size_t first_ref;
+    const std::size_t second_ref;
+  };
+
+  OptimizerContext *optimizer_context_;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(ReuseAggregateExpressions);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 4ea809b..b93ec49 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -262,6 +262,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -318,6 +319,7 @@ target_link_libraries(quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 0e75411..cd376c1 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -32,6 +32,7 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -532,6 +533,7 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
     }
 
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (auto selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -539,8 +541,10 @@ void HashInnerJoinWorkOrder::executeWithoutCopyElision(ValueAccessor *probe_acce
                                                                   build_accessor.get(),
                                                                   probe_relation_id,
                                                                   probe_accessor,
-                                                                  build_block_entry.second));
+                                                                  build_block_entry.second,
+                                                                  scalar_cache.get()));
     }
+    scalar_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }
@@ -649,12 +653,14 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
         zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
       }
 
+      ScalarCache scalar_cache;
       for (const Scalar *scalar : non_trivial_expressions) {
         temp_result.addColumn(scalar->getAllValuesForJoin(build_relation_id,
                                                           build_accessor.get(),
                                                           probe_relation_id,
                                                           probe_accessor,
-                                                          zipped_joined_tuple_ids));
+                                                          zipped_joined_tuple_ids,
+                                                          &scalar_cache));
       }
     }
 
@@ -765,13 +771,16 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_store.createValueAccessor(&filter));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -828,12 +837,15 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -886,12 +898,15 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -976,14 +991,18 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn(
         (*selection_it)->getAllValues(probe_accessor_with_filter.get(),
-                                      &sub_blocks_ref));
+                                      &sub_blocks_ref,
+                                      scalar_cache.get()));
   }
+  scalar_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -1032,12 +1051,11 @@ void HashOuterJoinWorkOrder::execute() {
            &build_block_entry : *collector.getJoinedTupleMap()) {
     const BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
-    const TupleStorageSubBlock &build_store =
-        build_block->getTupleStorageSubBlock();
+    const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
 
-    std::unique_ptr<ValueAccessor> build_accessor(
-        build_store.createValueAccessor());
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (auto selection_it = selection_.begin();
          selection_it != selection_.end();
          ++selection_it) {
@@ -1047,8 +1065,11 @@ void HashOuterJoinWorkOrder::execute() {
               build_accessor.get(),
               probe_relation_id,
               probe_accessor.get(),
-              build_block_entry.second));
+              build_block_entry.second,
+              scalar_cache.get()));
     }
+    scalar_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 
@@ -1061,8 +1082,9 @@ void HashOuterJoinWorkOrder::execute() {
   if (num_tuples_without_matches > 0) {
     std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
         probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
-    ColumnVectorsValueAccessor temp_result;
 
+    ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (std::size_t i = 0; i < selection_.size(); ++i) {
       if (is_selection_on_build_[i]) {
         // NOTE(harshad, jianqiao): The assumption here is that any operation
@@ -1090,9 +1112,12 @@ void HashOuterJoinWorkOrder::execute() {
       } else {
         temp_result.addColumn(
             selection_[i]->getAllValues(probe_accessor_with_filter.get(),
-                                        &sub_blocks_ref));
+                                        &sub_blocks_ref,
+                                        scalar_cache.get()));
       }
     }
+    scalar_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/relational_operators/NestedLoopsJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index f17402f..a6bacc7 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogRelationSchema.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -417,6 +418,7 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
     // evaluation and data movement, but low enough that temporary memory
     // requirements don't get out of hand).
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -424,8 +426,10 @@ void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_st
                                                                   left_accessor.get(),
                                                                   right_input_relation_id,
                                                                   right_accessor.get(),
-                                                                  joined_tuple_ids));
+                                                                  joined_tuple_ids,
+                                                                  scalar_cache.get()));
     }
+    scalar_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 90543c4..6337b5e 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -38,6 +38,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/CollisionFreeVectorTable.hpp"
 #include "storage/HashTableFactory.hpp"
@@ -48,6 +49,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/SubBlocksReference.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -93,11 +95,15 @@ AggregationOperationState::AggregationOperationState(
                                     !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
-    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
-      is_aggregate_collision_free_ = true;
-    } else {
-      is_aggregate_partitioned_ = checkAggregatePartitioned(
-          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    switch (hash_table_impl_type) {
+      case HashTableImplType::kCollisionFreeVector:
+        is_aggregate_collision_free_ = true;
+        break;
+      case HashTableImplType::kThreadPrivateCompactKey:
+        break;
+      default:
+        is_aggregate_partitioned_ = checkAggregatePartitioned(
+            estimated_num_entries, is_distinct_, group_by, aggregate_functions);
     }
   }
 
@@ -491,9 +497,10 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
     SubBlocksReference sub_blocks_ref(tuple_store,
                                       block->getIndices(),
                                       block->getIndicesConsistent());
+    ScalarCache scalar_cache;
     for (const auto &expression : non_trivial_expressions_) {
       non_trivial_results->addColumn(
-          expression->getAllValues(accessor, &sub_blocks_ref));
+          expression->getAllValues(accessor, &sub_blocks_ref, &scalar_cache));
     }
   }
 
@@ -713,7 +720,17 @@ void AggregationOperationState::finalizeHashTable(
     finalizeHashTableImplPartitioned(partition_id, output_destination);
   } else {
     DCHECK_EQ(0u, partition_id);
-    finalizeHashTableImplThreadPrivate(output_destination);
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    switch (group_by_hashtable_pool_->getHashTableImplType()) {
+      case HashTableImplType::kSeparateChaining:
+        finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
+        break;
+      case HashTableImplType::kThreadPrivateCompactKey:
+        finalizeHashTableImplThreadPrivateCompactKey(output_destination);
+        break;
+      default:
+        LOG(FATAL) << "Not supported";
+    }
   }
 }
 
@@ -838,7 +855,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
     InsertDestination *output_destination) {
   // TODO(harshad) - The merge phase may be slower when each hash table contains
   // large number of entries. We should find ways in which we can perform a
@@ -946,6 +963,34 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey(
+    InsertDestination *output_destination) {
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  std::unique_ptr<ThreadPrivateCompactKeyHashTable> final_hash_table(
+      static_cast<ThreadPrivateCompactKeyHashTable*>(hash_tables->back().release()));
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    final_hash_table->merge(
+        static_cast<const ThreadPrivateCompactKeyHashTable*>(hash_table.get()));
+    hash_table->destroyPayload();
+  }
+
+//  final_hash_table->print();
+
+  ColumnVectorsValueAccessor complete_result;
+  final_hash_table->finalize(&complete_result);
+  final_hash_table->destroyPayload();
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
   std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
   memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e6af494..207c4f0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -256,7 +256,11 @@ class AggregationOperationState {
   void finalizeHashTableImplPartitioned(const std::size_t partition_id,
                                         InsertDestination *output_destination);
 
-  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivatePackedPayload(
+      InsertDestination *output_destination);
+
+  void finalizeHashTableImplThreadPrivateCompactKey(
+      InsertDestination *output_destination);
 
   std::size_t getMemoryConsumptionBytesHelper(
       const std::vector<std::unique_ptr<AggregationStateHashTableBase>>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index cb1f098..0f610eb 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,6 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h
 add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp)
 add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp)
 add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp)
+add_library(quickstep_storage_ThreadPrivateCompactKeyHashTable
+            ThreadPrivateCompactKeyHashTable.cpp
+            ThreadPrivateCompactKeyHashTable.hpp)
 add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp)
 add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
@@ -276,6 +279,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_storage_AggregationOperationState_proto
                       quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_HashTableBase
@@ -288,6 +292,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
@@ -723,6 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
                       quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -936,6 +942,7 @@ target_link_libraries(quickstep_storage_StorageBlock
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
@@ -1037,6 +1044,20 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry
 target_link_libraries(quickstep_storage_SubBlocksReference
                       glog
                       quickstep_utility_PtrVector)
+target_link_libraries(quickstep_storage_ThreadPrivateCompactKeyHashTable
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_storage_TupleIdSequence
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_BitVector
@@ -1086,6 +1107,7 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
+                      quickstep_expressions_scalar_ScalarCache
                       quickstep_expressions_windowaggregation_WindowAggregateFunction
                       quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
                       quickstep_expressions_windowaggregation_WindowAggregationHandle
@@ -1161,6 +1183,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_storage_SubBlockTypeRegistryMacros
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 490a5cc..221a221 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -70,6 +70,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
 
   ~CollisionFreeVectorTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kCollisionFreeVector;
+  }
+
   void destroyPayload() override;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 6839ebc..ed383df 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,6 +26,7 @@ enum HashTableImplType {
   LINEAR_OPEN_ADDRESSING = 1;
   SEPARATE_CHAINING = 2;
   SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
+  THREAD_PRIVATE_COMPACT_KEY = 4;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 8be388a..6982a3f 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -44,7 +44,8 @@ enum class HashTableImplType {
   kCollisionFreeVector,
   kLinearOpenAddressing,
   kSeparateChaining,
-  kSimpleScalarSeparateChaining
+  kSimpleScalarSeparateChaining,
+  kThreadPrivateCompactKey
 };
 
 /**
@@ -117,6 +118,8 @@ class AggregationStateHashTableBase {
 
   virtual std::size_t getMemoryConsumptionBytes() const = 0;
 
+  virtual HashTableImplType getImplType() const = 0;
+
  protected:
   AggregationStateHashTableBase() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 9686429..cb1f16f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,6 +32,7 @@
 #include "storage/PackedPayloadHashTable.hpp"
 #include "storage/SeparateChainingHashTable.hpp"
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
 #include "utility/BloomFilter.hpp"
@@ -123,6 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
       return HashTableImplType::kSeparateChaining;
     case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
       return HashTableImplType::kSimpleScalarSeparateChaining;
+    case serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY:
+      return HashTableImplType::kThreadPrivateCompactKey;
     default: {
       LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
     }
@@ -355,7 +358,6 @@ class AggregationStateHashTableFactory {
    *        hash table constructor.
    * @return A new aggregation state hash table.
    **/
-
   static AggregationStateHashTableBase* CreateResizable(
       const HashTableImplType hash_table_type,
       const std::vector<const Type*> &key_types,
@@ -363,13 +365,16 @@ class AggregationStateHashTableFactory {
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager) {
     switch (hash_table_type) {
-      case HashTableImplType::kSeparateChaining:
-        return new PackedPayloadHashTable(
-            key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
             key_types.front(), num_entries, handles, storage_manager);
+      case HashTableImplType::kSeparateChaining:
+        return new PackedPayloadHashTable(
+            key_types, num_entries, handles, storage_manager);
+      case HashTableImplType::kThreadPrivateCompactKey:
+        return new ThreadPrivateCompactKeyHashTable(
+            key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "
                    << "AggregationStateHashTableFactory::createResizable()";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 6dbd7f9..7257906 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -75,6 +75,10 @@ class HashTablePool {
         handles_(handles),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  HashTableImplType getHashTableImplType() const {
+    return hash_table_impl_type_;
+  }
+
   /**
    * @brief Check out a hash table for insertion.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 960d5a7..3e89aab 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -88,6 +88,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
 
   ~PackedPayloadHashTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kSeparateChaining;
+  }
+
   /**
    * @brief Erase all entries in this hash table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index e91c1ac..d724317 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -30,6 +30,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "storage/BasicColumnStoreTupleStorageSubBlock.hpp"
 #include "storage/BloomFilterIndexSubBlock.hpp"
 #include "storage/CSBTreeIndexSubBlock.hpp"
@@ -369,15 +370,18 @@ void StorageBlock::select(const vector<unique_ptr<const Scalar>> &selection,
                                       indices_,
                                       indices_consistent_);
 
-    std::unique_ptr<ValueAccessor> accessor(
-        tuple_store_->createValueAccessor(filter));
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
+    ScalarCache scalar_cache;
 
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
          selection_cit != selection.end();
          ++selection_cit) {
       // TODO(chasseur): Can probably elide some copies for parts of the
       // selection that are ScalarAttribute or ScalarLiteral.
-      temp_result.addColumn((*selection_cit)->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(
+          (*selection_cit)->getAllValues(accessor.get(),
+                                         &sub_blocks_ref,
+                                         &scalar_cache));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/ThreadPrivateCompactKeyHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.cpp b/storage/ThreadPrivateCompactKeyHashTable.cpp
new file mode 100644
index 0000000..226188b
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.cpp
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
+
+#include <cstddef>
+#include <cstdint>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "types/TypeID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+
+
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/ThreadPrivateCompactKeyHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.hpp b/storage/ThreadPrivateCompactKeyHashTable.hpp
new file mode 100644
index 0000000..a22fdab
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.hpp
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase {
+ public:
+  ThreadPrivateCompactKeyHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager)
+      : key_types_(key_types),
+        handles_(handles),
+        bucket_size_(0),
+        num_buckets_(num_entries),
+        buckets_allocated_(0) {
+    for (const Type *key_type : key_types) {
+      DCHECK(!key_type->isVariableLength());
+
+      const std::size_t key_size = key_type->maximumByteLength();
+      DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 8u);
+
+      key_sizes_.emplace_back(key_size);
+    }
+
+    for (const AggregationHandle *handle : handles) {
+      state_offsets_.emplace_back(bucket_size_);
+
+      const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+      DCHECK_LE(arg_types.size(), 1u);
+
+      std::size_t state_size = 0;
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          state_size = sizeof(std::int64_t);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, arg_types.size());
+          switch (arg_types.front()->getTypeID()) {
+            case TypeID::kInt:  // Fall through
+            case TypeID::kLong:
+              state_size = sizeof(std::int64_t);
+              break;
+            case TypeID::kFloat:  // Fall through
+            case TypeID::kDouble:
+              state_size = sizeof(double);
+              break;
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      bucket_size_ += state_size;
+    }
+
+    keys_.reset(sizeof(std::uint64_t) * num_buckets_);
+    buckets_.reset(bucket_size_ * num_buckets_);
+  }
+
+  ~ThreadPrivateCompactKeyHashTable() override {}
+
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kThreadPrivateCompactKey;
+  }
+
+  void destroyPayload() override {}
+
+  std::size_t getMemoryConsumptionBytes() const override {
+    return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t));
+  }
+
+  inline std::size_t numEntries() const {
+    return buckets_allocated_;
+  }
+
+  bool upsertValueAccessorCompositeKey(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      const ValueAccessorMultiplexer &accessor_mux) override {
+    ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+    ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+    DCHECK(base_accessor != nullptr);
+    const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+    ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples);
+    std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get());
+    std::size_t key_code_offset = 0;
+    for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+      const auto &key_attr_id = key_attr_ids[i];
+      ValueAccessor *accessor =
+          key_attr_id.source == ValueAccessorSource::kBase
+              ? base_accessor
+              : derived_accessor;
+      DCHECK(accessor != nullptr);
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          ConstructKeyCode<std::uint8_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 2u:
+          ConstructKeyCode<std::uint16_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 4u:
+          ConstructKeyCode<std::uint32_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        case 8u:
+          ConstructKeyCode<std::uint64_t>(
+              key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+
+      key_code_offset += key_size;
+    }
+
+    std::vector<BucketIndex> bucket_indices;
+    bucket_indices.reserve(num_tuples);
+    std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get());
+    for (std::size_t i = 0; i < num_tuples; ++i) {
+      const std::size_t code = key_codes[i];
+      const auto index_it = index_.find(code);
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        bucket_indices.emplace_back(buckets_allocated_);
+        keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          upsertValueAccessorCount(bucket_indices, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          DCHECK_EQ(1u, argument_ids[i].size());
+          const auto &argument_id = argument_ids[i].front();
+          ValueAccessor *accessor =
+              argument_id.source == ValueAccessorSource::kBase
+                  ? base_accessor
+                  : derived_accessor;
+          DCHECK(accessor != nullptr);
+
+          DCHECK_EQ(1u, handle->getArgumentTypes().size());
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt: {
+              upsertValueAccessorSum<int, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kLong: {
+              upsertValueAccessorSum<std::int64_t, std::int64_t>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kFloat: {
+              upsertValueAccessorSum<float, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            case kDouble: {
+              upsertValueAccessorSum<double, double>(
+                  bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+
+    return true;
+  }
+
+  void merge(const ThreadPrivateCompactKeyHashTable *other) {
+    std::vector<BucketIndex> dst_bucket_indices;
+    std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get());
+
+    const char *src_buckets_start =
+        static_cast<const char*>(other->buckets_.get());
+    const std::uint64_t *src_keys =
+        static_cast<const std::uint64_t*>(other->keys_.get());
+
+    for (std::size_t i = 0; i < other->buckets_allocated_; ++i) {
+      const std::uint64_t code = src_keys[i];
+      const auto index_it = index_.find(code);
+
+      if (index_it == index_.end()) {
+        // TODO: Resize if overflow
+        index_.emplace(code, buckets_allocated_);
+        dst_bucket_indices.emplace_back(buckets_allocated_);
+        dst_keys[buckets_allocated_] = code;
+        ++buckets_allocated_;
+      } else {
+        dst_bucket_indices.emplace_back(index_it->second);
+      }
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          mergeStateSum<std::int64_t>(
+              dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              mergeStateSum<std::int64_t>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              mergeStateSum<double>(
+                  dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+    }
+  }
+
+  void print() const {
+    std::cout << "num_entries = " << buckets_allocated_ << "\n";
+    const double *values = static_cast<const double*>(buckets_.get());
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      std::cout << values[i] << "\n";
+    }
+  }
+
+  void finalize(ColumnVectorsValueAccessor *output) const {
+    std::size_t key_offset = 0;
+    for (std::size_t i = 0; i < key_types_.size(); ++i) {
+      const Type &key_type = *key_types_[i];
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(key_type, buckets_allocated_));
+
+      const std::size_t key_size = key_sizes_[i];
+      switch (key_size) {
+        case 1u:
+          finalizeKey<std::uint8_t>(key_offset, native_cv.get());
+          break;
+        case 2u:
+          finalizeKey<std::uint16_t>(key_offset, native_cv.get());
+          break;
+        case 4u:
+          finalizeKey<std::uint32_t>(key_offset, native_cv.get());
+          break;
+        case 8u:
+          finalizeKey<std::uint64_t>(key_offset, native_cv.get());
+          break;
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+
+      key_offset += key_size;
+    }
+
+    // Dispatch
+    for (std::size_t i = 0; i < handles_.size(); ++i) {
+      const AggregationHandle *handle = handles_[i];
+      const Type &result_type = *handle->getResultType();
+      std::unique_ptr<NativeColumnVector> native_cv(
+          new NativeColumnVector(result_type, buckets_allocated_));
+
+      switch (handle->getAggregationID()) {
+        case AggregationID::kCount: {
+          finalizeStateSum<std::int64_t, std::int64_t>(
+              state_offsets_[i], native_cv.get());
+          break;
+        }
+        case AggregationID::kSum: {
+          const Type *argument_type = handle->getArgumentTypes().front();
+          switch (argument_type->getTypeID()) {
+            case kInt:  // Fall through
+            case kLong: {
+              finalizeStateSum<std::int64_t, std::int64_t>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            case kFloat:  // Fall through
+            case kDouble: {
+              finalizeStateSum<double, double>(
+                  state_offsets_[i], native_cv.get());
+              break;
+            }
+            default:
+              LOG(FATAL) << "Not implemented";
+          }
+          break;
+        }
+        default:
+          LOG(FATAL) << "Not implemented";
+      }
+      output->addColumn(native_cv.release());
+    }
+  }
+
+ private:
+  using BucketIndex = std::uint32_t;
+
+  template <typename KeyT>
+  inline static void ConstructKeyCode(const std::size_t offset,
+                                      const attribute_id attr_id,
+                                      ValueAccessor *accessor,
+                                      void *key_code_start) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+      accessor->beginIteration();
+      while (accessor->next()) {
+        *reinterpret_cast<KeyT*>(key_code_ptr) =
+            *static_cast<const KeyT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        key_code_ptr += sizeof(std::uint64_t);
+      }
+    });
+  }
+
+  inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
+                                       const std::size_t state_offset) {
+    char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    for (const BucketIndex idx : bucket_indices) {
+      char *state_ptr = state_start + bucket_size_ * idx;
+      *reinterpret_cast<std::int64_t*>(state_ptr) += 1;
+    }
+  }
+
+  template <typename ArgumentT, typename StateT>
+  inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
+                                     const std::size_t state_offset,
+                                     const attribute_id attr_id,
+                                     ValueAccessor *accessor) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+
+      char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+      std::size_t loc = 0;
+      while (accessor->next()) {
+        char *state_ptr = state_start + bucket_size_ * bucket_indices[loc];
+        *reinterpret_cast<StateT*>(state_ptr) +=
+            *static_cast<const ArgumentT*>(
+                accessor->template getUntypedValue<false>(attr_id));
+        ++loc;
+      }
+    });
+  }
+
+  template <typename StateT>
+  inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+                            const void *src_buckets_start,
+                            const std::size_t state_offset) {
+    char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset;
+    const char* src_state_start =
+        static_cast<const char*>(src_buckets_start) + state_offset;
+    for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+      char *dst_state_ptr = dst_state_start + bucket_size_ * dst_bucket_indices[i];
+      const char *src_state_ptr = src_state_start + bucket_size_ * i;
+      *reinterpret_cast<StateT*>(dst_state_ptr) +=
+          *reinterpret_cast<const StateT*>(src_state_ptr);
+    }
+  }
+
+  template <typename KeyT>
+  inline void finalizeKey(const std::size_t offset,
+                          NativeColumnVector *output_cv) const {
+    const char *key_ptr = static_cast<const char*>(keys_.get()) + offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const KeyT*>(key_ptr);
+      key_ptr += sizeof(std::uint64_t);
+    }
+  }
+
+  template <typename StateT, typename ResultT>
+  inline void finalizeStateSum(const std::size_t state_offset,
+                               NativeColumnVector *output_cv) const {
+    char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) =
+          *reinterpret_cast<const StateT*>(state_ptr);
+      state_ptr += bucket_size_;
+    }
+  }
+
+  const std::vector<const Type*> key_types_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::vector<std::size_t> key_sizes_;
+  std::vector<std::size_t> state_offsets_;
+  std::size_t bucket_size_;
+
+  std::unordered_map<std::uint64_t, BucketIndex> index_;
+
+  std::size_t num_buckets_;
+  std::size_t buckets_allocated_;
+
+  ScopedBuffer keys_;
+  ScopedBuffer buckets_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 58bdf18..2c571ef 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -33,6 +33,7 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/scalar/ScalarCache.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunction.hpp"
 #include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
 #include "expressions/window_aggregation/WindowAggregationHandle.hpp"
@@ -236,11 +237,16 @@ void WindowAggregationOperationState::windowAggregateBlocks(
       argument_accessor = new ColumnVectorsValueAccessor();
     }
 
+    std::unique_ptr<ScalarCache> scalar_cache = std::make_unique<ScalarCache>();
     for (const std::unique_ptr<const Scalar> &argument : arguments_) {
       argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
-                                                          &sub_block_ref));
+                                                          &sub_block_ref,
+                                                          scalar_cache.get()));
     }
 
+    // Release common subexpression cache as early as possible.
+    scalar_cache.reset();
+
     InvokeOnAnyValueAccessor(tuple_accessor,
                              [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
       tuple_accessor->beginIteration();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/types/containers/ColumnVector.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index fc65656..430a844 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -43,6 +43,9 @@ namespace quickstep {
 // TODO(chasseur): Look into ways to allocate ColumnVector memory from the
 // StorageManager.
 
+class ColumnVector;
+typedef std::shared_ptr<const ColumnVector> ColumnVectorPtr;
+
 /**
  * @brief A vector of values of the same type. Two implementations exist:
  *        NativeColumnVector (an array of fixed-size data elements) and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/types/containers/ColumnVectorsValueAccessor.hpp
----------------------------------------------------------------------
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index 6dc1124..d9cf49d 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -74,22 +74,23 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
    *             this value-accessor is responsible for freeing this column
    *             vector.
    **/
-  void addColumn(ColumnVector *column, const bool owns = true) {
+  void addColumn(ColumnVectorPtr column) {
     // If this is not the first column to be added, make sure it is the same
     // length as the others.
     DCHECK(columns_.empty()
            || (column->isNative()
-               ? (static_cast<const NativeColumnVector*>(column)->size() == column_length_)
-               : (static_cast<const IndirectColumnVector*>(column)->size() == column_length_)));
+               ? (static_cast<const NativeColumnVector*>(column.get())->size() == column_length_)
+               : (static_cast<const IndirectColumnVector*>(column.get())->size() == column_length_)));
     columns_.push_back(column);
     column_native_.push_back(column->isNative());
-    if (owns) {
-      deleter_.addObject(column);
-    }
-    column_length_
-        = column->isNative()
-          ? static_cast<const NativeColumnVector*>(column)->size()
-          : static_cast<const IndirectColumnVector*>(column)->size();
+    column_length_ =
+        column->isNative()
+            ? static_cast<const NativeColumnVector*>(column.get())->size()
+            : static_cast<const IndirectColumnVector*>(column.get())->size();
+  }
+
+  void addColumn(ColumnVector *column) {
+    addColumn(ColumnVectorPtr(column));
   }
 
   inline void beginIteration() {
@@ -309,11 +310,10 @@ class ColumnVectorsValueAccessor : public ValueAccessor {
            && (static_cast<std::vector<ColumnVector*>::size_type>(attr_id) < columns_.size());
   }
 
-  std::vector<ColumnVector*> columns_;
+  std::vector<ColumnVectorPtr> columns_;
   std::vector<bool> column_native_;
   std::size_t column_length_;
   std::size_t current_position_;
-  ScopedDeleter deleter_;
 
   DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..ea9ee43 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -182,6 +182,7 @@ add_library(quickstep_utility_ExecutionDAGVisualizer
             ExecutionDAGVisualizer.cpp
             ExecutionDAGVisualizer.hpp)
 add_library(quickstep_utility_Glob Glob.cpp Glob.hpp)
+add_library(quickstep_utility_HashError ../empty_src.cpp HashError.hpp)
 add_library(quickstep_utility_HashPair ../empty_src.cpp HashPair.hpp)
 add_library(quickstep_utility_Macros ../empty_src.cpp Macros.hpp)
 add_library(quickstep_utility_MemStream ../empty_src.cpp MemStream.hpp)
@@ -350,6 +351,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob
+                      quickstep_utility_HashError
                       quickstep_utility_HashPair
                       quickstep_utility_Macros
                       quickstep_utility_MemStream

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ddd077fe/utility/HashError.hpp
----------------------------------------------------------------------
diff --git a/utility/HashError.hpp b/utility/HashError.hpp
new file mode 100644
index 0000000..3a59979
--- /dev/null
+++ b/utility/HashError.hpp
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+#define QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+
+#include <exception>
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class HashNotSupported : public std::exception {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param message The error message.
+   **/
+  HashNotSupported(const std::string &message)
+      : message_(message) {}
+
+  ~HashNotSupported() throw() {}
+
+  virtual const char* what() const throw() {
+    return message_.c_str();
+  }
+
+ private:
+  const std::string message_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_HASH_ERROR_HPP_


Mime
View raw message