quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [09/32] incubator-quickstep git commit: Add ThreadPrivateCompactKeyHashTable as a fast path data structure for aggregation.
Date Thu, 04 May 2017 03:22:53 GMT
Add ThreadPrivateCompactKeyHashTable as a fast path data structure for aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d6a01e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d6a01e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d6a01e7c

Branch: refs/heads/new-op
Commit: d6a01e7c867354ca05545595644a62b03de56b81
Parents: 8169306
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Fri Apr 21 23:23:13 2017 -0500
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Mon Apr 24 13:40:17 2017 -0500

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |  14 +-
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  71 +++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  12 +
 storage/AggregationOperationState.cpp           |  68 ++-
 storage/AggregationOperationState.hpp           |   6 +-
 storage/CMakeLists.txt                          |  24 ++
 storage/CollisionFreeVectorTable.hpp            |   4 +
 storage/HashTable.proto                         |   1 +
 storage/HashTableBase.hpp                       |  18 +-
 storage/HashTableFactory.hpp                    |  13 +-
 storage/HashTablePool.hpp                       |   9 +
 storage/PackedPayloadHashTable.hpp              |   4 +
 storage/ThreadPrivateCompactKeyHashTable.cpp    | 421 +++++++++++++++++++
 storage/ThreadPrivateCompactKeyHashTable.hpp    | 230 ++++++++++
 14 files changed, 870 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 3e0f647..9625a91 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1580,14 +1580,22 @@ void ExecutionGenerator::convertAggregate(
             ->canUseCollisionFreeAggregation(physical_plan,
                                              estimated_num_groups,
                                              &max_num_groups)) {
+      // First option: use array-based aggregation if applicable.
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
       use_parallel_initialization = true;
     } else {
-      // Otherwise, use SeparateChaining.
-      aggr_state_proto->set_hash_table_impl_type(
-          serialization::HashTableImplType::SEPARATE_CHAINING);
+      if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
+              physical_plan, estimated_num_groups)) {
+        // Second option: use thread-private compact-key aggregation if applicable.
+        aggr_state_proto->set_hash_table_impl_type(
+            serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY);
+      } else {
+        // Otherwise, use SeparateChaining.
+        aggr_state_proto->set_hash_table_impl_type(
+            serialization::HashTableImplType::SEPARATE_CHAINING);
+      }
       aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
     }
   } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index b17fac0..e0e3dff 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -71,8 +71,8 @@ namespace optimizer {
 namespace cost {
 
 DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
-              "The maximum allowed key range (number of entries) for using a "
-              "CollisionFreeVectorTable.");
+             "The maximum allowed key range (number of entries) for using a "
+             "CollisionFreeVectorTable.");
 
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
@@ -700,6 +700,73 @@ bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
   return true;
 }
 
+bool StarSchemaSimpleCostModel::canUseTwoPhaseCompactKeyAggregation(
+    const physical::AggregatePtr &aggregate,
+    const std::size_t estimated_num_groups) {
+  // Require estimated number of groups to be below the specified threshold.
+  //
+  // TODO(jianqiao): It is good to have the threshold to be the same as
+  // FLAGS_partition_aggregation_num_groups_threshold which is defined in
+  // AggregationOperationState.cpp. However, there seems to be no sound place
+  // to put that flag so that it can be shared by the two cpp files (optimizer
+  // vs backend). So here we hardcode the threshold and leave it to be solved
+  // later.
+  if (estimated_num_groups >= 10000u) {
+    return false;
+  }
+
+  // Require fix-length non-nullable keys that can be packed into a 64-bit QWORD.
+  std::size_t total_key_size = 0;
+  for (const auto &key_expr : aggregate->grouping_expressions()) {
+    const Type &type = key_expr->getValueType();
+    if (type.isVariableLength() || type.isNullable()) {
+      return false;
+    }
+    total_key_size += type.maximumByteLength();
+  }
+
+  if (total_key_size > sizeof(std::uint64_t)) {
+    return false;
+  }
+
+  // Check aggregate arguments.
+  for (const auto &agg_alias : aggregate->aggregate_expressions()) {
+    const E::AggregateFunctionPtr agg_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(agg_alias->expression());
+
+    // Not supporting DISTINCT aggregation.
+    if (agg_expr->is_distinct()) {
+      return false;
+    }
+
+    // Currently we do not handle NULL values.
+    const auto &arguments = agg_expr->getArguments();
+    for (const auto &arg : arguments) {
+      if (arg->getValueType().isNullable()) {
+        return false;
+      }
+    }
+
+    // Restricted to COUNT/SUM with INT/LONG/FLOAT/DOUBLE arguments.
+    switch (agg_expr->getAggregate().getAggregationID()) {
+      case AggregationID::kCount:
+        break;
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, arguments.size());
+        if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+                                           kInt, kLong, kFloat, kDouble)) {
+          return false;
+        }
+        break;
+      }
+      default:
+        return false;
+    }
+  }
+
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 0461077..99518cf 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -184,6 +184,18 @@ class StarSchemaSimpleCostModel : public CostModel {
                                       const std::size_t estimated_num_groups,
                                       std::size_t *max_num_groups);
 
+  /**
+   * @brief Checks whether an aggregate node can be efficiently evaluated with
+   *        the two-phase compact key aggregation fast path.
+   *
+   * @param aggregate The physical aggregate node to be checked.
+   * @param estimated_num_groups The estimated number of groups for the aggregate.
+   * @return A bool value indicating whether two-phase compact key aggregation
+   *         can be used to evaluate \p aggregate.
+   */
+  bool canUseTwoPhaseCompactKeyAggregation(const physical::AggregatePtr &aggregate,
+                                           const std::size_t estimated_num_groups);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index e5dc93e..0f4a105 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -48,6 +48,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/SubBlocksReference.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -69,10 +70,10 @@ namespace quickstep {
 DEFINE_int32(num_aggregation_partitions,
              41,
              "The number of partitions used for performing the aggregation");
-DEFINE_int32(partition_aggregation_num_groups_threshold,
-             500000,
-             "The threshold used for deciding whether the aggregation is done "
-             "in a partitioned way or not");
+DEFINE_uint64(partition_aggregation_num_groups_threshold,
+              100000,
+              "The threshold used for deciding whether the aggregation is done "
+              "in a partitioned way or not");
 
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
@@ -94,11 +95,16 @@ AggregationOperationState::AggregationOperationState(
                                     !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
   if (!group_by.empty()) {
-    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
-      is_aggregate_collision_free_ = true;
-    } else {
-      is_aggregate_partitioned_ = checkAggregatePartitioned(
-          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    switch (hash_table_impl_type) {
+      case HashTableImplType::kCollisionFreeVector:
+        is_aggregate_collision_free_ = true;
+        break;
+      case HashTableImplType::kThreadPrivateCompactKey:
+        is_aggregate_partitioned_ = false;
+        break;
+      default:
+        is_aggregate_partitioned_ = checkAggregatePartitioned(
+            estimated_num_entries, is_distinct_, group_by, aggregate_functions);
     }
   }
 
@@ -420,9 +426,7 @@ bool AggregationOperationState::checkAggregatePartitioned(
 
   // There are GROUP BYs without DISTINCT. Check if the estimated number of
   // groups is large enough to warrant a partitioned aggregation.
-  return estimated_num_groups >=
-         static_cast<std::size_t>(
-             FLAGS_partition_aggregation_num_groups_threshold);
+  return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
 }
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -715,7 +719,18 @@ void AggregationOperationState::finalizeHashTable(
     finalizeHashTableImplPartitioned(partition_id, output_destination);
   } else {
     DCHECK_EQ(0u, partition_id);
-    finalizeHashTableImplThreadPrivate(output_destination);
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    switch (group_by_hashtable_pool_->getHashTableImplType()) {
+      case HashTableImplType::kSeparateChaining:
+        finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
+        break;
+      case HashTableImplType::kThreadPrivateCompactKey:
+        finalizeHashTableImplThreadPrivateCompactKey(output_destination);
+        break;
+      default:
+        LOG(FATAL) << "Unexpected hash table type in "
+                   << "AggregationOperationState::finalizeHashTable()";
+    }
   }
 }
 
@@ -840,7 +855,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
     InsertDestination *output_destination) {
   // TODO(harshad) - The merge phase may be slower when each hash table contains
   // large number of entries. We should find ways in which we can perform a
@@ -948,6 +963,31 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey(
+    InsertDestination *output_destination) {
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  // Merge all hash tables into one.
+  std::unique_ptr<ThreadPrivateCompactKeyHashTable> final_hash_table(
+      static_cast<ThreadPrivateCompactKeyHashTable*>(hash_tables->back().release()));
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    final_hash_table->mergeFrom(
+        static_cast<const ThreadPrivateCompactKeyHashTable&>(*hash_table));
+  }
+
+  ColumnVectorsValueAccessor complete_result;
+  final_hash_table->finalize(&complete_result);
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
   std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
   memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e6af494..207c4f0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -256,7 +256,11 @@ class AggregationOperationState {
   void finalizeHashTableImplPartitioned(const std::size_t partition_id,
                                         InsertDestination *output_destination);
 
-  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivatePackedPayload(
+      InsertDestination *output_destination);
+
+  void finalizeHashTableImplThreadPrivateCompactKey(
+      InsertDestination *output_destination);
 
   std::size_t getMemoryConsumptionBytesHelper(
       const std::vector<std::unique_ptr<AggregationStateHashTableBase>>

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index c3db584..4296ba0 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,6 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h
 add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp)
 add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp)
 add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp)
+add_library(quickstep_storage_ThreadPrivateCompactKeyHashTable
+            ThreadPrivateCompactKeyHashTable.cpp
+            ThreadPrivateCompactKeyHashTable.hpp)
 add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp)
 add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
@@ -288,6 +291,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
@@ -724,6 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
                       quickstep_storage_PackedPayloadHashTable
                       quickstep_storage_SeparateChainingHashTable
                       quickstep_storage_SimpleScalarSeparateChainingHashTable
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleReference
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -1039,6 +1044,24 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry
 target_link_libraries(quickstep_storage_SubBlocksReference
                       glog
                       quickstep_utility_PtrVector)
+target_link_libraries(quickstep_storage_ThreadPrivateCompactKeyHashTable
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_Macros
+                      quickstep_utility_ScopedBuffer)
 target_link_libraries(quickstep_storage_TupleIdSequence
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_BitVector
@@ -1164,6 +1187,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_storage_SubBlockTypeRegistryMacros
                       quickstep_storage_SubBlocksReference
+                      quickstep_storage_ThreadPrivateCompactKeyHashTable
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 490a5cc..221a221 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -70,6 +70,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
 
   ~CollisionFreeVectorTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kCollisionFreeVector;
+  }
+
   void destroyPayload() override;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 6839ebc..ed383df 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,6 +26,7 @@ enum HashTableImplType {
   LINEAR_OPEN_ADDRESSING = 1;
   SEPARATE_CHAINING = 2;
   SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
+  THREAD_PRIVATE_COMPACT_KEY = 4;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 8be388a..4d9310c 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -44,7 +44,8 @@ enum class HashTableImplType {
   kCollisionFreeVector,
   kLinearOpenAddressing,
   kSeparateChaining,
-  kSimpleScalarSeparateChaining
+  kSimpleScalarSeparateChaining,
+  kThreadPrivateCompactKey
 };
 
 /**
@@ -113,8 +114,23 @@ class AggregationStateHashTableBase {
       const std::vector<MultiSourceAttributeId> &key_attr_ids,
       const ValueAccessorMultiplexer &accessor_mux) = 0;
 
+  /**
+   * @brief Destroy hash table payloads.
+   */
   virtual void destroyPayload() = 0;
 
+  /**
+   * @brief Get the implementation type of this aggregation hash table.
+   *
+   * @return The implementation type of this aggregation hash table.
+   */
+  virtual HashTableImplType getImplType() const = 0;
+
+  /**
+   * @brief Get the estimated memory consumption of this hash table in bytes.
+   *
+   * @return The estimated memory consumption of this hash table in bytes.
+   */
   virtual std::size_t getMemoryConsumptionBytes() const = 0;
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 9686429..cb1f16f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,6 +32,7 @@
 #include "storage/PackedPayloadHashTable.hpp"
 #include "storage/SeparateChainingHashTable.hpp"
 #include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
 #include "storage/TupleReference.hpp"
 #include "types/TypeFactory.hpp"
 #include "utility/BloomFilter.hpp"
@@ -123,6 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
       return HashTableImplType::kSeparateChaining;
     case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
       return HashTableImplType::kSimpleScalarSeparateChaining;
+    case serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY:
+      return HashTableImplType::kThreadPrivateCompactKey;
     default: {
       LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
     }
@@ -355,7 +358,6 @@ class AggregationStateHashTableFactory {
    *        hash table constructor.
    * @return A new aggregation state hash table.
    **/
-
   static AggregationStateHashTableBase* CreateResizable(
       const HashTableImplType hash_table_type,
       const std::vector<const Type*> &key_types,
@@ -363,13 +365,16 @@ class AggregationStateHashTableFactory {
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager) {
     switch (hash_table_type) {
-      case HashTableImplType::kSeparateChaining:
-        return new PackedPayloadHashTable(
-            key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
             key_types.front(), num_entries, handles, storage_manager);
+      case HashTableImplType::kSeparateChaining:
+        return new PackedPayloadHashTable(
+            key_types, num_entries, handles, storage_manager);
+      case HashTableImplType::kThreadPrivateCompactKey:
+        return new ThreadPrivateCompactKeyHashTable(
+            key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in "
                    << "AggregationStateHashTableFactory::createResizable()";

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 6dbd7f9..f3abddb 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -76,6 +76,15 @@ class HashTablePool {
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
   /**
+   * @brief Get the type of hash table implementation for this pool.
+   *
+   * @return The type of hash table implementation for this pool.
+   */
+  HashTableImplType getHashTableImplType() const {
+    return hash_table_impl_type_;
+  }
+
+  /**
    * @brief Check out a hash table for insertion.
    *
    * @note This method is relevant for specialized (for aggregation)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 960d5a7..3e89aab 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -88,6 +88,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
 
   ~PackedPayloadHashTable() override;
 
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kSeparateChaining;
+  }
+
   /**
    * @brief Erase all entries in this hash table.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/ThreadPrivateCompactKeyHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.cpp b/storage/ThreadPrivateCompactKeyHashTable.cpp
new file mode 100644
index 0000000..fb68940
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.cpp
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <type_traits>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+namespace {
+
+#define CASE_KEY_SIZE(value) \
+  case value: return functor(std::integral_constant<std::size_t, value>())
+
+template <typename FunctorT>
+auto InvokeOnKeySize(const std::size_t key_size, const FunctorT &functor) {
+  switch (key_size) {
+    CASE_KEY_SIZE(1);
+    CASE_KEY_SIZE(2);
+    CASE_KEY_SIZE(3);
+    CASE_KEY_SIZE(4);
+    CASE_KEY_SIZE(5);
+    CASE_KEY_SIZE(6);
+    CASE_KEY_SIZE(7);
+    CASE_KEY_SIZE(8);
+    default:
+      break;
+  }
+  LOG(FATAL) << "Unexpected key size: " << key_size;
+}
+
+#undef CASE_KEY_SIZE
+
+}  // namespace
+
+constexpr std::size_t ThreadPrivateCompactKeyHashTable::kKeyCodeSize;
+
+ThreadPrivateCompactKeyHashTable::ThreadPrivateCompactKeyHashTable(
+    const std::vector<const Type*> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle*> &handles,
+    StorageManager *storage_manager)
+    : key_types_(key_types),
+      handles_(handles),
+      total_state_size_(0),
+      num_buckets_(0),
+      buckets_allocated_(0),
+      storage_manager_(storage_manager) {
+  // Cache key sizes.
+  for (const Type *key_type : key_types) {
+    DCHECK(!key_type->isVariableLength());
+    DCHECK(!key_type->isNullable());
+    key_sizes_.emplace_back(key_type->maximumByteLength());
+  }
+
+  for (const AggregationHandle *handle : handles) {
+    const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+    DCHECK_LE(arg_types.size(), 1u);
+    DCHECK(arg_types.empty() || !arg_types.front()->isNullable());
+
+    // Figure out state size.
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::int64_t);
+        break;
+      }
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, arg_types.size());
+        switch (arg_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::int64_t);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(double);
+            break;
+          default:
+            LOG(FATAL) << "Unexpected argument type";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unexpected AggregationID";
+    }
+    state_sizes_.emplace_back(state_size);
+    total_state_size_ += state_size;
+  }
+
+  // Calculate required memory size for keys and states.
+  const std::size_t required_memory =
+      num_entries * (kKeyCodeSize + total_state_size_);
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  // Use storage manager to allocate memory.
+  const block_id blob_id = storage_manager->createBlob(num_storage_slots);
+  blob_ = storage_manager->getBlobMutable(blob_id);
+
+  num_buckets_ = blob_->size() / (kKeyCodeSize + total_state_size_);
+  void *memory = blob_->getMemoryMutable();
+
+  // Calculate the memory locations of state vectors.
+  keys_ = static_cast<KeyCode*>(memory);
+  char *state_memory = static_cast<char*>(memory) + num_buckets_ * kKeyCodeSize;
+  std::memset(state_memory, 0, num_buckets_ * total_state_size_);
+
+  for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
+    state_vecs_.emplace_back(state_memory);
+    state_memory += num_buckets_ * state_sizes_[i];
+  }
+}
+
+ThreadPrivateCompactKeyHashTable::~ThreadPrivateCompactKeyHashTable() {
+  // Release the blob.
+  if (blob_.valid()) {
+    const block_id blob_id = blob_->getID();
+    blob_.release();
+    storage_manager_->deleteBlockOrBlobFile(blob_id);
+  }
+}
+
+void ThreadPrivateCompactKeyHashTable::resize() {
+  DCHECK_EQ(buckets_allocated_, num_buckets_);
+
+  const std::size_t resized_memory_size =
+      num_buckets_ * 2 * (kKeyCodeSize + total_state_size_);
+  const std::size_t resized_num_slots =
+      storage_manager_->SlotsNeededForBytes(resized_memory_size);
+
+  const block_id resized_blob_id =
+      storage_manager_->createBlob(resized_num_slots);
+  MutableBlobReference resized_blob =
+      storage_manager_->getBlobMutable(resized_blob_id);
+
+  const std::size_t resized_num_buckets =
+      resized_blob->size() / (kKeyCodeSize + total_state_size_);
+  void *resized_memory = resized_blob->getMemoryMutable();
+
+  KeyCode *resized_keys = static_cast<KeyCode*>(resized_memory);
+  std::memcpy(resized_keys, keys_, buckets_allocated_ * kKeyCodeSize);
+  keys_ = resized_keys;
+
+  char *resized_state_memory =
+      static_cast<char*>(resized_memory) + resized_num_buckets * kKeyCodeSize;
+  for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
+    const std::size_t vec_size = buckets_allocated_ * state_sizes_[i];
+    const std::size_t resized_vec_size = resized_num_buckets * state_sizes_[i];
+
+    std::memcpy(resized_state_memory, state_vecs_[i], vec_size);
+    std::memset(resized_state_memory + vec_size,
+                0,
+                resized_vec_size - vec_size);
+
+    state_vecs_[i] = resized_state_memory;
+    resized_state_memory += resized_vec_size;
+  }
+
+  std::swap(blob_, resized_blob);
+  num_buckets_ = resized_num_buckets;
+
+  const block_id blob_id_to_delete = resized_blob->getID();
+  resized_blob.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id_to_delete);
+}
+
+bool ThreadPrivateCompactKeyHashTable::upsertValueAccessorCompositeKey(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_attr_ids,
+    const ValueAccessorMultiplexer &accessor_mux) {
+  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+  DCHECK(base_accessor != nullptr);
+  const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+  ScopedBuffer buffer(num_tuples * kKeyCodeSize);
+  KeyCode *key_codes = static_cast<KeyCode*>(buffer.get());
+  std::size_t key_code_offset = 0;
+  for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+    const auto &key_attr_id = key_attr_ids[i];
+    ValueAccessor *accessor =
+        key_attr_id.source == ValueAccessorSource::kBase
+            ? base_accessor
+            : derived_accessor;
+    DCHECK(accessor != nullptr);
+
+    // Pack the key component into the 64-bit code (with proper offset).
+    InvokeOnKeySize(
+        key_sizes_[i],
+        [&](auto key_size) -> void {  // NOLINT(build/c++11)
+      ConstructKeyCode<decltype(key_size)::value>(
+          key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+    });
+    key_code_offset += key_sizes_[i];
+  }
+
+  std::vector<BucketIndex> bucket_indices(num_tuples);
+  for (std::size_t i = 0; i < num_tuples; ++i) {
+    const std::size_t code = key_codes[i];
+    const auto index_it = index_.find(code);
+    if (index_it == index_.end()) {
+      if (buckets_allocated_ >= num_buckets_) {
+        resize();
+      }
+      index_.emplace(code, buckets_allocated_);
+      bucket_indices[i] = buckets_allocated_;
+      keys_[buckets_allocated_] = code;
+      ++buckets_allocated_;
+    } else {
+      bucket_indices[i] = index_it->second;
+    }
+  }
+
+  // Dispatch on AggregationID and argument type.
+  // TODO(jianqiao): refactor type system and aggregation facilities to eliminate
+  // this type of ad-hoc switch statements.
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    const AggregationHandle *handle = handles_[i];
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        upsertValueAccessorCount(bucket_indices, state_vecs_[i]);
+        break;
+      }
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, argument_ids[i].size());
+        const auto &argument_id = argument_ids[i].front();
+        ValueAccessor *accessor =
+            argument_id.source == ValueAccessorSource::kBase
+                ? base_accessor
+                : derived_accessor;
+        DCHECK(accessor != nullptr);
+
+        DCHECK_EQ(1u, handle->getArgumentTypes().size());
+        const Type *argument_type = handle->getArgumentTypes().front();
+        switch (argument_type->getTypeID()) {
+          case kInt: {
+            upsertValueAccessorSum<int, std::int64_t>(
+                bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+            break;
+          }
+          case kLong: {
+            upsertValueAccessorSum<std::int64_t, std::int64_t>(
+                bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+            break;
+          }
+          case kFloat: {
+            upsertValueAccessorSum<float, double>(
+                bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+            break;
+          }
+          case kDouble: {
+            upsertValueAccessorSum<double, double>(
+                bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+            break;
+          }
+          default:
+            LOG(FATAL) << "Unexpected argument type";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unexpected AggregationID";
+    }
+  }
+
+  return true;
+}
+
+void ThreadPrivateCompactKeyHashTable::mergeFrom(
+    const ThreadPrivateCompactKeyHashTable &source) {
+  // First merge keys and generate location mappings. That is, source hash
+  // table's bucket *i* should be merged into destination hash table's bucket
+  // *dst_bucket_indices[i]*.
+  std::vector<BucketIndex> dst_bucket_indices(source.buckets_allocated_);
+  const KeyCode *src_keys = source.keys_;
+  for (std::size_t i = 0; i < source.buckets_allocated_; ++i) {
+    const KeyCode code = src_keys[i];
+    const auto index_it = index_.find(code);
+
+    if (index_it == index_.end()) {
+      if (buckets_allocated_ >= num_buckets_) {
+        resize();
+      }
+      index_.emplace(code, buckets_allocated_);
+      dst_bucket_indices[i] = buckets_allocated_;
+      keys_[buckets_allocated_] = code;
+      ++buckets_allocated_;
+    } else {
+      dst_bucket_indices[i] = index_it->second;
+    }
+  }
+
+  // Then merge states in a column-wise way based on dst_bucket_indices.
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    const AggregationHandle *handle = handles_[i];
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        mergeStateSum<std::int64_t>(
+            dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+        break;
+      }
+      case AggregationID::kSum: {
+        const Type *argument_type = handle->getArgumentTypes().front();
+        switch (argument_type->getTypeID()) {
+          case kInt:  // Fall through
+          case kLong: {
+            mergeStateSum<std::int64_t>(
+                dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+            break;
+          }
+          case kFloat:  // Fall through
+          case kDouble: {
+            mergeStateSum<double>(
+                dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+            break;
+          }
+          default:
+            LOG(FATAL) << "Unexpected argument type";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unexpected AggregationID";
+    }
+  }
+}
+
+void ThreadPrivateCompactKeyHashTable::finalize(
+    ColumnVectorsValueAccessor *output) const {
+  // First finalize keys.
+  std::size_t key_offset = 0;
+  for (std::size_t i = 0; i < key_types_.size(); ++i) {
+    const Type &key_type = *key_types_[i];
+    std::unique_ptr<NativeColumnVector> native_cv(
+        std::make_unique<NativeColumnVector>(key_type, buckets_allocated_));
+
+    InvokeOnKeySize(
+        key_sizes_[i],
+        [&](auto key_size) -> void {  // NOLINT(build/c++11)
+      this->finalizeKey<decltype(key_size)::value>(key_offset, native_cv.get());
+    });
+    output->addColumn(native_cv.release());
+    key_offset += key_sizes_[i];
+  }
+
+  // Then finalize states.
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const Type &result_type = *handle->getResultType();
+    std::unique_ptr<NativeColumnVector> native_cv(
+        std::make_unique<NativeColumnVector>(result_type, buckets_allocated_));
+
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        finalizeStateSum<std::int64_t, std::int64_t>(
+            state_vecs_[i], native_cv.get());
+        break;
+      }
+      case AggregationID::kSum: {
+        const Type *argument_type = handle->getArgumentTypes().front();
+        switch (argument_type->getTypeID()) {
+          case kInt:  // Fall through
+          case kLong: {
+            finalizeStateSum<std::int64_t, std::int64_t>(
+                state_vecs_[i], native_cv.get());
+            break;
+          }
+          case kFloat:  // Fall through
+          case kDouble: {
+            finalizeStateSum<double, double>(
+                state_vecs_[i], native_cv.get());
+            break;
+          }
+          default:
+            LOG(FATAL) << "Unexpected argument type";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Unexpected AggregationID";
+    }
+    output->addColumn(native_cv.release());
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/ThreadPrivateCompactKeyHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.hpp b/storage/ThreadPrivateCompactKeyHashTable.hpp
new file mode 100644
index 0000000..277e2e5
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.hpp
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregationHandle;
+class StorageManager;
+class Type;
+
+/**
+ * @brief Specialized aggregation hash table that is preferable for two-phase
+ *        aggregation with small-cardinality group-by keys. To use this hash
+ *        table, it also requires that the group-by keys have fixed-length types
+ *        with total byte size no greater than 8 (so that the keys can be packed
+ *        into a 64-bit QWORD).
+ */
+class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param key_types A vector of one or more types (>1 indicates a composite
+   *        key).
+   * @param num_entries The estimated number of entries this hash table will
+   *        hold.
+   * @param handles The aggregation handles.
+   * @param storage_manager The StorageManager to use (a StorageBlob will be
+   *        allocated to hold this hash table's contents).
+   **/
+  ThreadPrivateCompactKeyHashTable(
+      const std::vector<const Type*> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle*> &handles,
+      StorageManager *storage_manager);
+
+  ~ThreadPrivateCompactKeyHashTable() override;
+
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kThreadPrivateCompactKey;
+  }
+
+  void destroyPayload() override {}
+
+  std::size_t getMemoryConsumptionBytes() const override {
+    return blob_->size();
+  }
+
+  /**
+   * @return The number of entries in this HashTable.
+   **/
+  inline std::size_t numEntries() const {
+    return buckets_allocated_;
+  }
+
+  bool upsertValueAccessorCompositeKey(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      const ValueAccessorMultiplexer &accessor_mux) override;
+
+  /**
+   * @brief Merge the states of \p source into this hash table.
+   *
+   * @param source The source hash table from which the states are to be merged
+   *        into this hash table.
+   */
+  void mergeFrom(const ThreadPrivateCompactKeyHashTable &source);
+
+  /**
+   * @brief Finalize all the aggregation state vectors and add the result column
+   *        vectors into the output ColumnVectorsValueAccessor.
+   *
+   * @param output The ColumnVectorsValueAccessor to add all the result column
+   *        vectors into.
+   */
+  void finalize(ColumnVectorsValueAccessor *output) const;
+
+ private:
+  // Compact key as a 64-bit QWORD.
+  using KeyCode = std::uint64_t;
+  static constexpr std::size_t kKeyCodeSize = sizeof(KeyCode);
+
+  using BucketIndex = std::uint32_t;
+
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+  }
+
+  // Grow the size of this hash table by a factor of 2.
+  void resize();
+
+  template <std::size_t key_size>
+  inline static void ConstructKeyCode(const std::size_t offset,
+                                      const attribute_id attr_id,
+                                      ValueAccessor *accessor,
+                                      void *key_code_start) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+      accessor->beginIteration();
+      while (accessor->next()) {
+        std::memcpy(key_code_ptr,
+                    accessor->template getUntypedValue<false>(attr_id),
+                    key_size);
+        key_code_ptr += kKeyCodeSize;
+      }
+    });
+  }
+
+  inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
+                                       void *state_vec) {
+    std::int64_t *states = static_cast<std::int64_t*>(state_vec);
+    for (const BucketIndex idx : bucket_indices) {
+      states[idx] += 1;
+    }
+  }
+
+  template <typename ArgumentT, typename StateT>
+  inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
+                                     const attribute_id attr_id,
+                                     ValueAccessor *accessor,
+                                     void *state_vec) {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor->beginIteration();
+
+      StateT *states = static_cast<StateT*>(state_vec);
+      for (const BucketIndex idx : bucket_indices) {
+        accessor->next();
+        states[idx] += *static_cast<const ArgumentT*>(
+            accessor->template getUntypedValue<false>(attr_id));
+      }
+    });
+  }
+
+  template <typename StateT>
+  inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+                            const void *src_state_vec,
+                            void *dst_state_vec) {
+    StateT *dst_states = static_cast<StateT*>(dst_state_vec);
+    const StateT* src_states = static_cast<const StateT*>(src_state_vec);
+    for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+      dst_states[dst_bucket_indices[i]] += src_states[i];
+    }
+  }
+
+  template <std::size_t key_size>
+  inline void finalizeKey(const std::size_t offset,
+                          NativeColumnVector *output_cv) const {
+    const char *key_ptr = reinterpret_cast<const char*>(keys_) + offset;
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      std::memcpy(output_cv->getPtrForDirectWrite(),
+                  key_ptr,
+                  key_size);
+      key_ptr += kKeyCodeSize;
+    }
+  }
+
+  template <typename StateT, typename ResultT>
+  inline void finalizeStateSum(const void *state_vec,
+                               NativeColumnVector *output_cv) const {
+    const StateT *states = static_cast<const StateT*>(state_vec);
+    for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+      *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) = states[i];
+    }
+  }
+
+  const std::vector<const Type*> key_types_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::vector<std::size_t> key_sizes_;
+  std::vector<std::size_t> state_sizes_;
+  std::size_t total_state_size_;
+
+  std::size_t num_buckets_;
+  std::size_t buckets_allocated_;
+
+  // Maps a compact-key to its bucket location.
+  std::unordered_map<KeyCode, BucketIndex> index_;
+
+  // Compact-key array where keys_[i] holds the compact-key for bucket i.
+  KeyCode *keys_;
+
+  // Use a column-wise layout for aggregation states.
+  std::vector<void*> state_vecs_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable);
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_


Mime
View raw message