quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [2/2] incubator-quickstep git commit: Improvements and bug fixes
Date Mon, 18 Dec 2017 01:41:58 GMT
Improvements and bug fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7e8b33f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7e8b33f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7e8b33f8

Branch: refs/heads/trace
Commit: 7e8b33f8a26aba9e66dd21c149d41f4f2971d36f
Parents: aec7623
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Sat Dec 16 22:32:05 2017 -0600
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Sat Dec 16 22:32:05 2017 -0600

----------------------------------------------------------------------
 cli/QuickstepCli.cpp                            |  10 +-
 query_execution/QueryContext.hpp                |  16 ++
 query_optimizer/ExecutionGenerator.cpp          | 214 ++++++-----------
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  39 +++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |   4 +-
 query_optimizer/resolver/Resolver.cpp           |   5 +-
 query_optimizer/rules/FuseAggregateJoin.cpp     |   6 -
 query_optimizer/rules/ReorderColumns.cpp        |   6 +-
 .../FinalizeAggregationOperator.cpp             |  29 +--
 .../FinalizeAggregationOperator.hpp             |   3 -
 .../InitializeAggregationOperator.cpp           |  24 +-
 .../InitializeAggregationOperator.hpp           |   7 +-
 relational_operators/InsertOperator.cpp         |  18 +-
 relational_operators/InsertOperator.hpp         |  13 +-
 relational_operators/WorkOrder.proto            |   2 +-
 relational_operators/WorkOrderFactory.cpp       |  28 ++-
 .../tests/AggregationOperator_unittest.cpp      |   2 -
 storage/AggregationOperationState.cpp           | 133 ++++++-----
 storage/AggregationOperationState.hpp           |  38 +--
 storage/AggregationOperationState.proto         |   3 -
 storage/CMakeLists.txt                          |  24 ++
 storage/CollisionFreeVectorTable.cpp            |  58 ++++-
 storage/CollisionFreeVectorTable.hpp            |  42 ++--
 storage/CompactKeySeparateChainingHashTable.cpp | 195 ++++++++++++++++
 storage/CompactKeySeparateChainingHashTable.hpp | 234 +++++++++++++++++++
 storage/Flags.hpp                               |   1 -
 storage/HashTable.proto                         |  15 +-
 storage/HashTableBase.hpp                       |   1 +
 storage/HashTableFactory.hpp                    |  24 +-
 utility/CMakeLists.txt                          |  10 +
 utility/ExecutionDAGVisualizer.cpp              |   4 +
 utility/Range.hpp                               | 188 +++++++++++++++
 utility/ScopedArray.hpp                         |  78 +++++++
 33 files changed, 1113 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/cli/QuickstepCli.cpp
----------------------------------------------------------------------
diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp
index 5db5dfc..b84b13b 100644
--- a/cli/QuickstepCli.cpp
+++ b/cli/QuickstepCli.cpp
@@ -378,7 +378,15 @@ int main(int argc, char* argv[]) {
               query_handle.release(),
               &bus);
         } catch (const quickstep::SqlError &sql_error) {
-          fprintf(io_handle->err(), "%s", sql_error.formatMessage(*command_string).c_str());
+          switch (statement.getStatementType()) {
+            case ParseStatement::kDropTable:
+              // Quick hack for QuickGrail for cleaner log information
+              // since we don't have DROP TABLE IF EXISTS yet.
+              break;
+            default:
+              fprintf(io_handle->err(), "%s",
+                      sql_error.formatMessage(*command_string).c_str());
+          }
           reset_parser = true;
           break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 7876821..e65f096 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -489,6 +489,22 @@ class QueryContext {
   }
 
   /**
+   * @brief Whether the given vector of Tuple ids is valid.
+   *
+   * @param ids The vector of Tuple ids.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool areValidTupleIds(const std::vector<tuple_id> &ids) const {
+    for (const tuple_id id : ids) {
+      if (id >= tuples_.size()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * @brief Release the ownership of the Tuple referenced by the id.
    *
    * @note Each id should use only once.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 3bca344..dd6efd4 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -132,6 +132,7 @@
 #include "relational_operators/UpdateOperator.hpp"
 #include "relational_operators/WindowAggregationOperator.hpp"
 #include "storage/AggregationOperationState.pb.h"
+#include "storage/Flags.hpp"
 #include "storage/HashTable.pb.h"
 #include "storage/HashTableFactory.hpp"
 #include "storage/InsertDestination.pb.h"
@@ -199,70 +200,6 @@ namespace S = ::quickstep::serialization;
 
 namespace {
 
-size_t CacheLineAlignedBytes(const size_t actual_bytes) {
-  return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
-}
-
-size_t CalculateNumInitializationPartitionsForCollisionFreeVectorTable(const size_t memory_size) {
-  // At least 1 partition, at most (#workers * 2) partitions.
-  return std::max(1uL, std::min(memory_size / kCollisonFreeVectorInitBlobSize,
-                                static_cast<size_t>(2 * FLAGS_num_workers)));
-}
-
-void CalculateCollisionFreeAggregationInfo(
-    const size_t num_entries, const vector<pair<AggregationID, vector<const Type *>>> &group_by_aggrs_info,
-    S::CollisionFreeVectorInfo *collision_free_vector_info) {
-  size_t memory_size = CacheLineAlignedBytes(
-      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
-  for (std::size_t i = 0; i < group_by_aggrs_info.size(); ++i) {
-    const auto &group_by_aggr_info = group_by_aggrs_info[i];
-
-    size_t state_size = 0;
-    switch (group_by_aggr_info.first) {
-      case AggregationID::kCount: {
-        state_size = sizeof(atomic<size_t>);
-        break;
-      }
-      case AggregationID::kSum: {
-        const vector<const Type *> &argument_types = group_by_aggr_info.second;
-        DCHECK_EQ(1u, argument_types.size());
-        switch (argument_types.front()->getTypeID()) {
-          case TypeID::kInt:
-          case TypeID::kLong:
-            state_size = sizeof(atomic<std::int64_t>);
-            break;
-          case TypeID::kFloat:
-          case TypeID::kDouble:
-            state_size = sizeof(atomic<double>);
-            break;
-          default:
-            LOG(FATAL) << "No support by CollisionFreeVector";
-        }
-        break;
-      }
-      default:
-        LOG(FATAL) << "No support by CollisionFreeVector";
-    }
-
-    collision_free_vector_info->add_state_offsets(memory_size);
-    memory_size += CacheLineAlignedBytes(state_size * num_entries);
-  }
-
-  collision_free_vector_info->set_memory_size(memory_size);
-  collision_free_vector_info->set_num_init_partitions(
-      CalculateNumInitializationPartitionsForCollisionFreeVectorTable(memory_size));
-}
-
-size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) {
-  // Set finalization segment size as 4096 entries.
-  constexpr size_t kFinalizeSegmentSize = 4uL * 1024L;
-
-  // At least 1 partition, at most (#workers * 2) partitions.
-  return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize,
-                                static_cast<size_t>(2 * FLAGS_num_workers)));
-}
-
 bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions,
                                const std::vector<bool> &is_distincts,
                                const std::vector<attribute_id> &group_by_attrs,
@@ -1212,6 +1149,7 @@ void ExecutionGenerator::convertCopyFrom(
         ->MergeFrom(output_relation->getPartitionScheme()->getProto());
   } else {
     insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+
     const StorageBlockLayout &layout = output_relation->getDefaultStorageBlockLayout();
     const auto sub_block_type = layout.getDescription().tuple_store_description().sub_block_type();
     if (sub_block_type != TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE) {
@@ -1464,72 +1402,75 @@ void ExecutionGenerator::convertInsertTuple(
       *catalog_database_->getRelationById(
           input_relation_info->relation->getID());
 
+
+  // Construct the tuple proto to be inserted.
+  std::vector<QueryContext::tuple_id> tuple_indexes;
+
   for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) {
-    // Construct the tuple proto to be inserted.
     const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
-
     S::Tuple *tuple_proto = query_context_proto_->add_tuples();
     for (const E::ScalarLiteralPtr &literal : tuple) {
       tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto());
     }
+    tuple_indexes.push_back(tuple_index);
+  }
 
-    // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
-    //               block supports ad-hoc insertion instead of hard-coding the block types.
-    const StorageBlockLayout &storage_block_layout =
-        input_relation.getDefaultStorageBlockLayout();
-    if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
-        TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
-        storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
-              TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
-      THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
-                        << input_relation.getName()
-                        << ", because its storage blocks do not support ad-hoc insertion";
-    }
+  // FIXME(qzeng): A better way is using a traits struct to look up whether a storage
+  //               block supports ad-hoc insertion instead of hard-coding the block types.
+  const StorageBlockLayout &storage_block_layout =
+      input_relation.getDefaultStorageBlockLayout();
+  if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+      TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
+      storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
+            TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
+    THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
+                      << input_relation.getName()
+                      << ", because its storage blocks do not support ad-hoc insertion";
+  }
 
-    // Create InsertDestination proto.
-    const QueryContext::insert_destination_id insert_destination_index =
-        query_context_proto_->insert_destinations_size();
-    S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
+  // Create InsertDestination proto.
+  const QueryContext::insert_destination_id insert_destination_index =
+      query_context_proto_->insert_destinations_size();
+  S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
 
-    insert_destination_proto->set_relation_id(input_relation.getID());
-    insert_destination_proto->mutable_layout()->MergeFrom(
-        input_relation.getDefaultStorageBlockLayout().getDescription());
+  insert_destination_proto->set_relation_id(input_relation.getID());
+  insert_destination_proto->mutable_layout()->MergeFrom(
+      input_relation.getDefaultStorageBlockLayout().getDescription());
 
-    if (input_relation.hasPartitionScheme()) {
-      insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
-      insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
-          ->MergeFrom(input_relation.getPartitionScheme()->getProto());
-    } else {
-      insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+  if (input_relation.hasPartitionScheme()) {
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+    insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+        ->MergeFrom(input_relation.getPartitionScheme()->getProto());
+  } else {
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
 
-      const vector<block_id> blocks(input_relation.getBlocksSnapshot());
-      for (const block_id block : blocks) {
-        insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
-      }
+    const vector<block_id> blocks(input_relation.getBlocksSnapshot());
+    for (const block_id block : blocks) {
+      insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
     }
+  }
 
-    const QueryPlan::DAGNodeIndex insert_operator_index =
-        execution_plan_->addRelationalOperator(
-            new InsertOperator(query_handle_->query_id(),
-                               input_relation,
-                               insert_destination_index,
-                               tuple_index));
-    insert_destination_proto->set_relational_op_index(insert_operator_index);
+  const QueryPlan::DAGNodeIndex insert_operator_index =
+      execution_plan_->addRelationalOperator(
+          new InsertOperator(query_handle_->query_id(),
+                             input_relation,
+                             insert_destination_index,
+                             tuple_indexes));
+  insert_destination_proto->set_relational_op_index(insert_operator_index);
 
-    CatalogRelation *mutable_relation =
-        catalog_database_->getRelationByIdMutable(input_relation.getID());
-    const QueryPlan::DAGNodeIndex save_blocks_index =
-        execution_plan_->addRelationalOperator(
-            new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
-    if (!input_relation_info->isStoredRelation()) {
-      execution_plan_->addDirectDependency(insert_operator_index,
-                                           input_relation_info->producer_operator_index,
-                                           true /* is_pipeline_breaker */);
-    }
-    execution_plan_->addDirectDependency(save_blocks_index,
-                                         insert_operator_index,
-                                         false /* is_pipeline_breaker */);
+  CatalogRelation *mutable_relation =
+      catalog_database_->getRelationByIdMutable(input_relation.getID());
+  const QueryPlan::DAGNodeIndex save_blocks_index =
+      execution_plan_->addRelationalOperator(
+          new SaveBlocksOperator(query_handle_->query_id(), mutable_relation));
+  if (!input_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(insert_operator_index,
+                                         input_relation_info->producer_operator_index,
+                                         true /* is_pipeline_breaker */);
   }
+  execution_plan_->addDirectDependency(save_blocks_index,
+                                       insert_operator_index,
+                                       false /* is_pipeline_breaker */);
 }
 
 void ExecutionGenerator::convertInsertSelection(
@@ -1800,8 +1741,6 @@ void ExecutionGenerator::convertAggregate(
       aggr_state_context_proto->mutable_aggregation_state();
   aggr_state_proto->set_relation_id(input_relation.getID());
 
-  bool use_parallel_initialization = false;
-
   std::vector<const Type*> group_by_types;
   std::vector<attribute_id> group_by_attrs;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
@@ -1865,27 +1804,30 @@ void ExecutionGenerator::convertAggregate(
     }
   }
 
+  bool use_parallel_initialization = false;
   bool aggr_state_is_partitioned = false;
   std::size_t aggr_state_num_partitions = 1u;
   if (!group_by_types.empty()) {
-    const std::size_t estimated_num_groups =
-        cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
-
     std::size_t max_num_groups;
     if (cost_model_for_aggregation_
             ->canUseCollisionFreeAggregation(physical_plan,
-                                             estimated_num_groups,
                                              &max_num_groups)) {
       // First option: use array-based aggregation if applicable.
       aggr_state_proto->set_hash_table_impl_type(
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
       use_parallel_initialization = true;
-      aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
-
-      CalculateCollisionFreeAggregationInfo(max_num_groups, group_by_aggrs_info,
-                                            aggr_state_proto->mutable_collision_free_vector_info());
+    } else if (cost_model_for_aggregation_
+                   ->canUseCompactKeySeparateChainingAggregation(physical_plan)) {
+      CHECK(aggregate_expressions.empty());
+      aggr_state_proto->set_hash_table_impl_type(
+          serialization::HashTableImplType::COMPACT_KEY_SEPARATE_CHAINING);
+      aggr_state_proto->set_estimated_num_entries(
+          cost_model_for_aggregation_->estimateCardinality(physical_plan->input()));
+      use_parallel_initialization = true;
     } else {
+      const std::size_t estimated_num_groups =
+          cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
       if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
               physical_plan, estimated_num_groups)) {
         // Second option: use thread-private compact-key aggregation if applicable.
@@ -1895,7 +1837,8 @@ void ExecutionGenerator::convertAggregate(
         // Otherwise, use SeparateChaining.
         aggr_state_proto->set_hash_table_impl_type(
             serialization::HashTableImplType::SEPARATE_CHAINING);
-        if (CheckAggregatePartitioned(aggregate_expressions.size(), is_distincts, group_by_attrs,
+        if (CheckAggregatePartitioned(aggregate_expressions.size(),
+                                      is_distincts, group_by_attrs,
                                       estimated_num_groups)) {
           aggr_state_is_partitioned = true;
           aggr_state_num_partitions = FLAGS_num_aggregation_partitions;
@@ -1930,15 +1873,12 @@ void ExecutionGenerator::convertAggregate(
   }
 
   if (use_parallel_initialization) {
-    DCHECK(aggr_state_proto->has_collision_free_vector_info());
-
     const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
         execution_plan_->addRelationalOperator(
             new InitializeAggregationOperator(
                 query_handle_->query_id(),
                 aggr_state_index,
-                num_partitions,
-                aggr_state_proto->collision_free_vector_info().num_init_partitions()));
+                num_partitions));
 
     execution_plan_->addDirectDependency(aggregation_operator_index,
                                          initialize_aggregation_operator_index,
@@ -1960,7 +1900,6 @@ void ExecutionGenerator::convertAggregate(
                                           aggr_state_index,
                                           num_partitions,
                                           physical_plan->hasRepartition(),
-                                          aggr_state_num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 
@@ -2031,10 +1970,7 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
 
   const size_t estimated_num_entries = physical_plan->group_by_key_value_range();
   aggr_state_proto->set_estimated_num_entries(estimated_num_entries);
-
-  const size_t aggr_state_num_partitions =
-      CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(estimated_num_entries);
-  aggr_state_proto->set_num_partitions(aggr_state_num_partitions);
+  aggr_state_proto->set_num_partitions(1u);
 
   if (physical_plan->right_filter_predicate() != nullptr) {
     std::unique_ptr<const Predicate> predicate(
@@ -2070,16 +2006,12 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
     aggr_proto->set_is_distinct(false);
   }
 
-  CalculateCollisionFreeAggregationInfo(estimated_num_entries, group_by_aggrs_info,
-                                        aggr_state_proto->mutable_collision_free_vector_info());
-
   const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new InitializeAggregationOperator(
               query_handle_->query_id(),
               aggr_state_index,
-              num_partitions,
-              aggr_state_proto->collision_free_vector_info().num_init_partitions()));
+              num_partitions));
 
   const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
       execution_plan_->addRelationalOperator(
@@ -2122,7 +2054,6 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
                                        build_aggregation_existence_map_operator_index,
                                        true /* is_pipeline_breaker */);
 
-
   // Create InsertDestination proto.
   const CatalogRelation *output_relation = nullptr;
   const QueryContext::insert_destination_id insert_destination_index =
@@ -2138,7 +2069,6 @@ void ExecutionGenerator::convertCrossReferenceCoalesceAggregate(
                                           aggr_state_index,
                                           num_partitions,
                                           physical_plan->hasRepartition(),
-                                          aggr_state_num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 2c84fc5..729a563 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -605,7 +605,6 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
 
 bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
     const P::AggregatePtr &aggregate,
-    const std::size_t estimated_num_groups,
     std::size_t *max_num_groups) {
 #ifdef QUICKSTEP_DISTRIBUTED
   // Currently we cannot do this fast path with the distributed setting. See
@@ -661,7 +660,7 @@ bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
   //    of hardcoding it as a gflag.
   if (min_cpp_value < 0 ||
       max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
-      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+      max_cpp_value / static_cast<double>(estimateNumGroupsForAggregate(aggregate)) > 256.0) {
     return false;
   }
 
@@ -767,6 +766,42 @@ bool StarSchemaSimpleCostModel::canUseTwoPhaseCompactKeyAggregation(
   return true;
 }
 
+bool StarSchemaSimpleCostModel::canUseCompactKeySeparateChainingAggregation(
+    const P::AggregatePtr &aggregate) {
+  P::TableReferencePtr table_reference;
+  if (!P::SomeTableReference::MatchesWithConditionalCast(aggregate->input(), &table_reference)) {
+    return false;
+  }
+
+  const auto &stat = table_reference->relation()->getStatistics();
+  if (!stat.isExact() || !stat.hasNumTuples()) {
+    return false;
+  }
+
+  if (stat.getNumTuples() <= 1000u) {
+    return false;
+  }
+
+  // Require fix-length non-nullable keys that can be packed into a 64-bit QWORD.
+  std::size_t total_key_size = 0;
+  for (const auto &key_expr : aggregate->grouping_expressions()) {
+    const Type &type = key_expr->getValueType();
+    if (type.isNullable() || type.isVariableLength()) {
+      return false;
+    }
+    total_key_size += type.maximumByteLength();
+  }
+  if (total_key_size > sizeof(std::uint64_t)) {
+    return false;
+  }
+
+  if (!aggregate->aggregate_expressions().empty()) {
+    return false;
+  }
+
+  return true;
+}
+
 }  // namespace cost
 }  // namespace optimizer
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 99518cf..a056866 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -173,7 +173,6 @@ class StarSchemaSimpleCostModel : public CostModel {
    *        the collision-free aggregation fast path.
    *
    * @param aggregate The physical aggregate node to be checked.
-   * @param estimated_num_groups The estimated number of groups for the aggregate.
    * @param exact_num_groups If collision-free aggregation is applicable, the
    *        pointed content of this pointer will be set as the maximum possible
    *        number of groups that the collision-free hash table need to hold.
@@ -181,7 +180,6 @@ class StarSchemaSimpleCostModel : public CostModel {
    *         used to evaluate \p aggregate.
    */
   bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
-                                      const std::size_t estimated_num_groups,
                                       std::size_t *max_num_groups);
 
   /**
@@ -196,6 +194,8 @@ class StarSchemaSimpleCostModel : public CostModel {
   bool canUseTwoPhaseCompactKeyAggregation(const physical::AggregatePtr &aggregate,
                                            const std::size_t estimated_num_groups);
 
+  bool canUseCompactKeySeparateChainingAggregation(const physical::AggregatePtr &aggregate);
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 17198c2..02bc1fb 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -1589,7 +1589,7 @@ L::LogicalPtr Resolver::resolveSetOperations(
     possible_attributes.push_back(possible_attribute);
   }
 
-  for (std::size_t opid = 0; opid < operation_attributes.size(); ++opid) {
+  for (std::size_t opid = 0; opid < attribute_matrix.size(); ++opid) {
     // Generate a cast operation if needed.
     std::vector<E::NamedExpressionPtr> cast_expressions;
     for (std::size_t aid = 0; aid < operation_attributes.size(); ++aid) {
@@ -2107,8 +2107,7 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
 const CatalogRelation* Resolver::resolveRelationName(
     const ParseString *relation_name) {
   const CatalogRelation *relation =
-      catalog_database_.getRelationByName(
-          ToLower(relation_name->value()));
+      catalog_database_.getRelationByName(ToLower(relation_name->value()));
   if (relation == nullptr) {
     THROW_SQL_ERROR_AT(relation_name) << "Unrecognized relation "
                                       << relation_name->value();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/rules/FuseAggregateJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp
index 6efc7e8..69e85af 100644
--- a/query_optimizer/rules/FuseAggregateJoin.cpp
+++ b/query_optimizer/rules/FuseAggregateJoin.cpp
@@ -109,23 +109,17 @@ P::PhysicalPtr FuseAggregateJoin::applyToNode(
 
   // Collision-free vector aggregation is applicable, and both the left and right
   // join attributes are range-bounded integer values.
-  const std::size_t estimated_num_groups =
-      cost_model_->estimateNumGroupsForAggregate(aggregate);
-
   std::size_t max_num_groups_left;
   if (!cost_model_->canUseCollisionFreeAggregation(aggregate,
-                                                   estimated_num_groups,
                                                    &max_num_groups_left)) {
     return node;
   }
-
   std::size_t max_num_groups_right;
   if (!cost_model_->canUseCollisionFreeAggregation(
            P::Aggregate::Create(hash_join->right(),
                                 E::ToNamedExpressions(hash_join->right_join_attributes()),
                                 aggregate->aggregate_expressions(),
                                 nullptr),
-           estimated_num_groups,
            &max_num_groups_right)) {
     return node;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/query_optimizer/rules/ReorderColumns.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReorderColumns.cpp b/query_optimizer/rules/ReorderColumns.cpp
index 4783a8d..d12e8c7 100644
--- a/query_optimizer/rules/ReorderColumns.cpp
+++ b/query_optimizer/rules/ReorderColumns.cpp
@@ -61,7 +61,11 @@ P::PhysicalPtr ReorderColumns::applyInternal(const P::PhysicalPtr &input,
   if (skip_transform) {
     std::vector<P::PhysicalPtr> new_children;
     for (const P::PhysicalPtr &child : input->children()) {
-      new_children.emplace_back(applyInternal(child, lock_ordering && is_not_transformable));
+      bool child_lock_ordering = lock_ordering && is_not_transformable;
+      if (child->getPhysicalType() == P::PhysicalType::kUnionAll) {
+        child_lock_ordering = true;
+      }
+      new_children.emplace_back(applyInternal(child, child_lock_ordering));
     }
 
     if (new_children != input->children()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 92fc7f6..cdba25a 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -50,7 +50,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
         query_context->getAggregationState(aggr_state_index_, part_id);
     DCHECK(agg_state != nullptr);
     for (std::size_t state_part_id = 0;
-         state_part_id < aggr_state_num_partitions_;
+         state_part_id < agg_state->getNumFinalizationPartitions();
          ++state_part_id) {
       container->addNormalWorkOrder(
           new FinalizeAggregationWorkOrder(
@@ -68,32 +68,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 }
 
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (started_) {
-    return true;
-  }
-
-  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    for (std::size_t state_part_id = 0;
-         state_part_id < aggr_state_num_partitions_;
-         ++state_part_id) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
-      proto->set_query_id(query_id_);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
-                          aggr_state_index_);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
-                          part_id);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
-                          state_part_id);
-      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
-                          output_destination_index_);
-
-      container->addWorkOrderProto(proto, op_index_);
-    }
-  }
-
-  started_ = true;
-  return true;
+  LOG(FATAL) << "Not supported";
 }
 
 void FinalizeAggregationWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 5931ca2..087fad5 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -71,12 +71,10 @@ class FinalizeAggregationOperator : public RelationalOperator {
       const QueryContext::aggregation_state_id aggr_state_index,
       const std::size_t num_partitions,
       const bool has_repartition,
-      const std::size_t aggr_state_num_partitions,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id, num_partitions, has_repartition, output_relation.getNumPartitions()),
         aggr_state_index_(aggr_state_index),
-        aggr_state_num_partitions_(aggr_state_num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         started_(false) {
@@ -111,7 +109,6 @@ class FinalizeAggregationOperator : public RelationalOperator {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t aggr_state_num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   bool started_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 89dfd7e..6bd9e8d 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -49,7 +49,7 @@ bool InitializeAggregationOperator::getAllWorkOrders(
     DCHECK(agg_state != nullptr);
 
     for (std::size_t state_part_id = 0;
-         state_part_id < aggr_state_num_init_partitions_;
+         state_part_id < agg_state->getNumInitializationPartitions();
          ++state_part_id) {
       container->addNormalWorkOrder(
           new InitializeAggregationWorkOrder(query_id_,
@@ -65,27 +65,7 @@ bool InitializeAggregationOperator::getAllWorkOrders(
 }
 
 bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (started_) {
-    return true;
-  }
-
-  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    for (std::size_t state_part_id = 0;
-         state_part_id < aggr_state_num_init_partitions_;
-         ++state_part_id) {
-      serialization::WorkOrder *proto = new serialization::WorkOrder;
-      proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
-      proto->set_query_id(query_id_);
-
-      proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
-      proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
-      proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id);
-
-      container->addWorkOrderProto(proto, op_index_);
-    }
-  }
-  started_ = true;
-  return true;
+  LOG(FATAL) << "Not supported";
 }
 
 void InitializeAggregationWorkOrder::execute() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index cf9abe5..beedc14 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -58,16 +58,12 @@ class InitializeAggregationOperator : public RelationalOperator {
    * @param aggr_state_index The index of the AggregationOperationState in QueryContext.
    * @param num_partitions The number of partitions in 'input_relation'. If no
    *        partitions, it is one.
-   * @param aggr_state_num_init_partitions The number of partitions to be used
-   *        for initialize the aggregation state collision free vector table.
    **/
   InitializeAggregationOperator(const std::size_t query_id,
                                 const QueryContext::aggregation_state_id aggr_state_index,
-                                const std::size_t num_partitions,
-                                const std::size_t aggr_state_num_init_partitions)
+                                const std::size_t num_partitions)
       : RelationalOperator(query_id, num_partitions),
         aggr_state_index_(aggr_state_index),
-        aggr_state_num_init_partitions_(aggr_state_num_init_partitions),
         started_(false) {}
 
   ~InitializeAggregationOperator() override {}
@@ -90,7 +86,6 @@ class InitializeAggregationOperator : public RelationalOperator {
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t aggr_state_num_init_partitions_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/InsertOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index fbd3a07..b8c9f07 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -20,6 +20,7 @@
 #include "relational_operators/InsertOperator.hpp"
 
 #include <memory>
+#include <vector>
 
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
@@ -43,12 +44,19 @@ bool InsertOperator::getAllWorkOrders(
     return true;
   }
 
+  std::vector<std::unique_ptr<Tuple>> tuples;
+
+  for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+    std::unique_ptr<Tuple> newTuple(query_context->releaseTuple(tuple_index));
+    tuples.push_back(std::move(newTuple));
+  }
+
   DCHECK(query_context != nullptr);
   container->addNormalWorkOrder(
       new InsertWorkOrder(
           query_id_,
           query_context->getInsertDestination(output_destination_index_),
-          query_context->releaseTuple(tuple_index_)),
+          std::move(tuples)),
       op_index_);
 
   work_generated_ = true;
@@ -64,7 +72,9 @@ bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
   proto->set_work_order_type(serialization::INSERT);
   proto->set_query_id(query_id_);
   proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
-  proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
+  for (const QueryContext::tuple_id tuple_index : tuple_indexes_) {
+    proto->AddExtension(serialization::InsertWorkOrder::tuple_indexes, tuple_index);
+  }
 
   container->addWorkOrderProto(proto, op_index_);
 
@@ -74,7 +84,9 @@ bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
 
 
 void InsertWorkOrder::execute() {
-  output_destination_->insertTuple(*tuple_);
+  for (const auto &tuple : tuples_) {
+    output_destination_->insertTuple(*tuple);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/InsertOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InsertOperator.hpp b/relational_operators/InsertOperator.hpp
index b103538..3865a7f 100644
--- a/relational_operators/InsertOperator.hpp
+++ b/relational_operators/InsertOperator.hpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <string>
 #include <memory>
+#include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
@@ -67,11 +68,11 @@ class InsertOperator : public RelationalOperator {
       const std::size_t query_id,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index,
-      const QueryContext::tuple_id tuple_index)
+      const std::vector<QueryContext::tuple_id> &tuple_indexes)
       : RelationalOperator(query_id, 1u, false, output_relation.getNumPartitions()),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
-        tuple_index_(tuple_index),
+        tuple_indexes_(tuple_indexes),
         work_generated_(false) {}
 
   ~InsertOperator() override {}
@@ -103,7 +104,7 @@ class InsertOperator : public RelationalOperator {
  private:
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
-  const QueryContext::tuple_id tuple_index_;
+  const std::vector<QueryContext::tuple_id> tuple_indexes_;
   bool work_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(InsertOperator);
@@ -125,10 +126,10 @@ class InsertWorkOrder : public WorkOrder {
    **/
   InsertWorkOrder(const std::size_t query_id,
                   InsertDestination *output_destination,
-                  Tuple *tuple)
+                  std::vector<std::unique_ptr<Tuple>> &&tuples)
       : WorkOrder(query_id),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        tuple_(DCHECK_NOTNULL(tuple)) {}
+        tuples_(std::move(tuples)) {}
 
   ~InsertWorkOrder() override {}
 
@@ -140,7 +141,7 @@ class InsertWorkOrder : public WorkOrder {
 
  private:
   InsertDestination *output_destination_;
-  std::unique_ptr<Tuple> tuple_;
+  std::vector<std::unique_ptr<Tuple>> tuples_;
 
   DISALLOW_COPY_AND_ASSIGN(InsertWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index aaf7929..b84e758 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -201,7 +201,7 @@ message InsertWorkOrder {
   extend WorkOrder {
     // All required.
     optional int32 insert_destination_index = 176;
-    optional uint32 tuple_index = 177;
+    repeated uint32 tuple_indexes = 177;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 3a991bd..7f11e3e 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -395,12 +395,22 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
     }
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+
+      const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+      std::vector<std::unique_ptr<Tuple>> tuple_indexes;
+
+      for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+        const int tuple_index =
+            proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+        tuple_indexes.emplace_back(
+            std::unique_ptr<Tuple>(query_context->releaseTuple(tuple_index)));
+      }
+
       return new InsertWorkOrder(
           query_id,
           query_context->getInsertDestination(
               proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
-          query_context->releaseTuple(
-              proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
+              std::move(tuple_indexes));
     }
     case serialization::NESTED_LOOP_JOIN: {
       const partition_id part_id =
@@ -852,12 +862,20 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              proto.HasExtension(serialization::InitializeAggregationWorkOrder::state_partition_id);
     }
     case serialization::INSERT: {
+      const int tuple_count = proto.ExtensionSize(serialization::InsertWorkOrder::tuple_indexes);
+      std::vector<QueryContext::tuple_id> tuple_indexes;
+
+      for (int specific_tuple_index = 0; specific_tuple_index < tuple_count; specific_tuple_index++) {
+        const int tuple_index =
+            proto.GetExtension(serialization::InsertWorkOrder::tuple_indexes, specific_tuple_index);
+        tuple_indexes.push_back(tuple_index);
+      }
+
       return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)) &&
-             proto.HasExtension(serialization::InsertWorkOrder::tuple_index) &&
-             query_context.isValidTupleId(
-                 proto.GetExtension(serialization::InsertWorkOrder::tuple_index));
+             proto.HasExtension(serialization::InsertWorkOrder::tuple_indexes) &&
+             query_context.areValidTupleIds(tuple_indexes);
     }
     case serialization::NESTED_LOOP_JOIN: {
       if (!proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id) ||

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 5c9cc7a..fa622f1 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -295,7 +295,6 @@ class AggregationOperatorTest : public ::testing::Test {
                                         aggr_state_index,
                                         kNumPartitions,
                                         kNoRepartition,
-                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 
@@ -391,7 +390,6 @@ class AggregationOperatorTest : public ::testing::Test {
                                         aggr_state_index,
                                         kNumPartitions,
                                         kNoRepartition,
-                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 9d58107..92798d8 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -83,14 +83,9 @@ AggregationOperationState::AggregationOperationState(
     const std::size_t num_partitions,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
-    StorageManager *storage_manager,
-    const size_t collision_free_vector_memory_size,
-    const size_t collision_free_vector_num_init_partitions,
-    const vector<size_t> &collision_free_vector_state_offsets)
+    StorageManager *storage_manager)
     : input_relation_(input_relation),
-      is_aggregate_collision_free_(
-          group_by.empty() ? false
-                           : hash_table_impl_type == HashTableImplType::kCollisionFreeVector),
+      hash_table_impl_type_(hash_table_impl_type),
       is_aggregate_partitioned_(is_partitioned),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
@@ -209,18 +204,14 @@ AggregationOperationState::AggregationOperationState(
 
   if (!group_by_key_ids_.empty()) {
     // Aggregation with GROUP BY: create the hash table (pool).
-    if (is_aggregate_collision_free_) {
-      collision_free_hashtable_.reset(
+    if (useCollisionFreeVector() || useCompactKeySeparateChaining()) {
+      shared_hash_table_.reset(
           AggregationStateHashTableFactory::CreateResizable(
               hash_table_impl_type,
               group_by_types_,
               estimated_num_entries,
               group_by_handles,
-              storage_manager,
-              num_partitions,
-              collision_free_vector_memory_size,
-              collision_free_vector_num_init_partitions,
-              collision_free_vector_state_offsets));
+              storage_manager));
     } else if (is_aggregate_partitioned_) {
       if (all_distinct_) {
         DCHECK_EQ(1u, group_by_handles.size());
@@ -300,19 +291,6 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
         PredicateFactory::ReconstructFromProto(proto.predicate(), database));
   }
 
-  size_t collision_free_vector_memory_size = 0;
-  size_t collision_free_vector_num_init_partitions = 0;
-  vector<size_t> collision_free_vector_state_offsets;
-  if (proto.has_collision_free_vector_info()) {
-    const serialization::CollisionFreeVectorInfo &collision_free_vector_info =
-        proto.collision_free_vector_info();
-    collision_free_vector_memory_size = collision_free_vector_info.memory_size();
-    collision_free_vector_num_init_partitions = collision_free_vector_info.num_init_partitions();
-    for (int i = 0; i < collision_free_vector_info.state_offsets_size(); ++i) {
-      collision_free_vector_state_offsets.push_back(collision_free_vector_info.state_offsets(i));
-    }
-  }
-
   return new AggregationOperationState(
       database.getRelationSchemaById(proto.relation_id()),
       aggregate_functions,
@@ -325,10 +303,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
       proto.num_partitions(),
       HashTableImplTypeFromProto(proto.hash_table_impl_type()),
       distinctify_hash_table_impl_types,
-      storage_manager,
-      collision_free_vector_memory_size,
-      collision_free_vector_num_init_partitions,
-      collision_free_vector_state_offsets);
+      storage_manager);
 }
 
 bool AggregationOperationState::ProtoIsValid(
@@ -386,17 +361,6 @@ bool AggregationOperationState::ProtoIsValid(
             proto.hash_table_impl_type())) {
       return false;
     }
-
-    if (proto.hash_table_impl_type() == S::HashTableImplType::COLLISION_FREE_VECTOR) {
-      if (!proto.has_collision_free_vector_info()) {
-        return false;
-      }
-
-      const S::CollisionFreeVectorInfo &proto_collision_free_vector_info = proto.collision_free_vector_info();
-      if (!proto_collision_free_vector_info.IsInitialized()) {
-        return false;
-      }
-    }
   }
 
   if (proto.has_predicate()) {
@@ -410,22 +374,25 @@ bool AggregationOperationState::ProtoIsValid(
 
 CollisionFreeVectorTable* AggregationOperationState
     ::getCollisionFreeVectorTable() const {
-  return static_cast<CollisionFreeVectorTable *>(
-      collision_free_hashtable_.get());
+  return static_cast<CollisionFreeVectorTable *>(shared_hash_table_.get());
 }
 
 void AggregationOperationState::initialize(const std::size_t partition_id) {
-  if (is_aggregate_collision_free_) {
-    static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->initialize(partition_id);
+  if (useCollisionFreeVector()) {
+    static_cast<CollisionFreeVectorTable*>(
+        shared_hash_table_.get())->initialize(partition_id);
+  } else if (useCompactKeySeparateChaining()) {
+    static_cast<CompactKeySeparateChainingHashTable*>(
+        shared_hash_table_.get())->initialize(partition_id);
   } else {
     LOG(FATAL) << "AggregationOperationState::initialize() "
                << "is not supported by this aggregation";
   }
 }
 
-void AggregationOperationState::aggregateBlock(const block_id input_block,
-                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
+void AggregationOperationState::aggregateBlock(
+    const block_id input_block,
+    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
   BlockReference block(
       storage_manager_->getBlock(input_block, input_relation_));
   const auto &tuple_store = block->getTupleStorageSubBlock();
@@ -526,8 +493,8 @@ void AggregationOperationState::mergeGroupByHashTables(
 
 void AggregationOperationState::aggregateBlockHashTable(
     const ValueAccessorMultiplexer &accessor_mux) {
-  if (is_aggregate_collision_free_) {
-    aggregateBlockHashTableImplCollisionFree(accessor_mux);
+  if (useCollisionFreeVector() || useCompactKeySeparateChaining()) {
+    aggregateBlockHashTableImplSharedTable(accessor_mux);
   } else if (is_aggregate_partitioned_) {
     aggregateBlockHashTableImplPartitioned(accessor_mux);
   } else {
@@ -535,13 +502,13 @@ void AggregationOperationState::aggregateBlockHashTable(
   }
 }
 
-void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+void AggregationOperationState::aggregateBlockHashTableImplSharedTable(
     const ValueAccessorMultiplexer &accessor_mux) {
-  DCHECK(collision_free_hashtable_ != nullptr);
+  DCHECK(shared_hash_table_ != nullptr);
 
-  collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_,
-                                                             group_by_key_ids_,
-                                                             accessor_mux);
+  shared_hash_table_->upsertValueAccessorCompositeKey(argument_ids_,
+                                                      group_by_key_ids_,
+                                                      accessor_mux);
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
@@ -671,8 +638,10 @@ void AggregationOperationState::finalizeSingleState(
 void AggregationOperationState::finalizeHashTable(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  if (is_aggregate_collision_free_) {
+  if (useCollisionFreeVector()) {
     finalizeHashTableImplCollisionFree(partition_id, output_destination);
+  } else if (useCompactKeySeparateChaining()) {
+    finalizeHashTableImplCompactKeySeparateChaining(partition_id, output_destination);
   } else if (is_aggregate_partitioned_) {
     finalizeHashTableImplPartitioned(partition_id, output_destination);
   } else {
@@ -695,9 +664,8 @@ void AggregationOperationState::finalizeHashTable(
 void AggregationOperationState::finalizeHashTableImplCollisionFree(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
-  std::vector<std::unique_ptr<ColumnVector>> final_values;
   CollisionFreeVectorTable *hash_table =
-      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
+      static_cast<CollisionFreeVectorTable *>(shared_hash_table_.get());
 
   const std::size_t max_length =
       hash_table->getNumTuplesInFinalizationPartition(partition_id);
@@ -726,6 +694,19 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
+void AggregationOperationState::finalizeHashTableImplCompactKeySeparateChaining(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  CompactKeySeparateChainingHashTable *hash_table =
+      static_cast<CompactKeySeparateChainingHashTable *>(shared_hash_table_.get());
+
+  ColumnVectorsValueAccessor complete_result;
+  hash_table->finalizeKeys(partition_id, &complete_result);
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
@@ -949,8 +930,8 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey(
 std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
   std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
   memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);
-  if (collision_free_hashtable_ != nullptr) {
-    memory += collision_free_hashtable_->getMemoryConsumptionBytes();
+  if (shared_hash_table_ != nullptr) {
+    memory += shared_hash_table_->getMemoryConsumptionBytes();
   }
   if (group_by_hashtable_pool_ != nullptr) {
     memory += group_by_hashtable_pool_->getMemoryConsumptionPoolBytes();
@@ -973,4 +954,34 @@ std::size_t AggregationOperationState::getMemoryConsumptionBytesHelper(
   return memory;
 }
 
+std::size_t AggregationOperationState::getNumInitializationPartitions() const {
+  if (useCollisionFreeVector()) {
+    return static_cast<const CollisionFreeVectorTable*>(
+        shared_hash_table_.get())->getNumInitializationPartitions();
+  }
+  if (useCompactKeySeparateChaining()) {
+    return static_cast<const CompactKeySeparateChainingHashTable*>(
+        shared_hash_table_.get())->getNumInitializationPartitions();
+  }
+  return 0u;
+}
+
+std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
+  if (group_by_key_ids_.empty()) {
+    return 1u;
+  }
+  if (useCollisionFreeVector()) {
+    return static_cast<const CollisionFreeVectorTable *>(
+        shared_hash_table_.get())->getNumFinalizationPartitions();
+  }
+  if (useCompactKeySeparateChaining()) {
+    return static_cast<const CompactKeySeparateChainingHashTable *>(
+        shared_hash_table_.get())->getNumFinalizationPartitions();
+  }
+  if (is_aggregate_partitioned_) {
+    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+  }
+  return 1u;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c7680b0..ae14dd1 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -108,12 +108,6 @@ class AggregationOperationState {
    * @param storage_manager The StorageManager to use for allocating hash
    *        tables. Single aggregation state (when GROUP BY list is not
    *        specified) is not allocated using memory from storage manager.
-   * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
-   *        the memory size.
-   * @param collision_free_vector_num_init_partitions For
-   *        CollisionFreeVectorTable, the number of partitions to initialize.
-   * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
-   *        the offsets for each state.
    */
   AggregationOperationState(
       const CatalogRelationSchema &input_relation,
@@ -127,10 +121,7 @@ class AggregationOperationState {
       const std::size_t num_partitions,
       const HashTableImplType hash_table_impl_type,
       const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
-      StorageManager *storage_manager,
-      const std::size_t collision_free_vector_memory_size = 0,
-      const std::size_t collision_free_vector_num_init_partitions = 0,
-      const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>());
+      StorageManager *storage_manager);
 
   ~AggregationOperationState() {}
 
@@ -175,6 +166,14 @@ class AggregationOperationState {
   std::size_t getNumInitializationPartitions() const;
 
   /**
+   * @brief Get the number of partitions to be used for finalizing the
+   *        aggregation.
+   *
+   * @return The number of partitions to be used for finalizing the aggregation.
+   **/
+  std::size_t getNumFinalizationPartitions() const;
+
+  /**
    * @brief Initialize the specified partition of this aggregation.
    *
    * @param partition_id ID of the partition to be initialized.
@@ -219,6 +218,14 @@ class AggregationOperationState {
   std::size_t getMemoryConsumptionBytes() const;
 
  private:
+  bool useCollisionFreeVector() const {
+    return hash_table_impl_type_ == HashTableImplType::kCollisionFreeVector;
+  }
+
+  bool useCompactKeySeparateChaining() const {
+    return hash_table_impl_type_ == HashTableImplType::kCompactKeySeparateChaining;
+  }
+
   // Aggregate on input block.
   void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
 
@@ -239,7 +246,7 @@ class AggregationOperationState {
                          InsertDestination *output_destination);
 
   // Specialized implementations for aggregateBlockHashTable.
-  void aggregateBlockHashTableImplCollisionFree(
+  void aggregateBlockHashTableImplSharedTable(
       const ValueAccessorMultiplexer &accessor_mux);
 
   void aggregateBlockHashTableImplPartitioned(
@@ -252,6 +259,9 @@ class AggregationOperationState {
   void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
                                           InsertDestination *output_destination);
 
+  void finalizeHashTableImplCompactKeySeparateChaining(const std::size_t partition_id,
+                                                       InsertDestination *output_destination);
+
   void finalizeHashTableImplPartitioned(const std::size_t partition_id,
                                         InsertDestination *output_destination);
 
@@ -269,8 +279,8 @@ class AggregationOperationState {
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
 
-  // Whether the aggregation is collision free or not.
-  const bool is_aggregate_collision_free_;
+  // Hash table implementation type.
+  const HashTableImplType hash_table_impl_type_;
 
   // Whether the aggregation is partitioned or not.
   const bool is_aggregate_partitioned_;
@@ -313,7 +323,7 @@ class AggregationOperationState {
 
   std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
 
-  std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+  std::unique_ptr<AggregationStateHashTableBase> shared_hash_table_;
 
   StorageManager *storage_manager_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/AggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index d2305f1..1a8a302 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -45,7 +45,4 @@ message AggregationOperationState {
 
   optional bool is_partitioned = 8;
   optional uint64 num_partitions = 9 [default = 1];
-
-  // Required if 'hash_table_impl_type' is 'COLLISION_FREE_VECTOR'.
-  optional CollisionFreeVectorInfo collision_free_vector_info = 10;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fb09e49..8ac7285 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -169,6 +169,9 @@ add_library(quickstep_storage_CollisionFreeVectorTable
             CollisionFreeVectorTable.cpp
             CollisionFreeVectorTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
+add_library(quickstep_storage_CompactKeySeparateChainingHashTable
+            CompactKeySeparateChainingHashTable.cpp
+            CompactKeySeparateChainingHashTable.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
             CompressedColumnStoreTupleStorageSubBlock.cpp
@@ -474,6 +477,25 @@ target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_types_operations_comparisons_ComparisonID
                       quickstep_types_operations_comparisons_ComparisonUtil
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_CompactKeySeparateChainingHashTable
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_cli_Flags
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_CharType
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_PrimeNumber
+                      quickstep_utility_Range
+                      quickstep_utility_ScopedArray
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_CompressedBlockBuilder
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -721,6 +743,7 @@ target_link_libraries(quickstep_storage_HashTable_proto
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
                       quickstep_storage_CollisionFreeVectorTable
+                      quickstep_storage_CompactKeySeparateChainingHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
@@ -1146,6 +1169,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_CSBTreeIndexSubBlock
                       quickstep_storage_CollisionFreeVectorTable
                       quickstep_storage_ColumnStoreUtil
+                      quickstep_storage_CompactKeySeparateChainingHashTable
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
                       quickstep_storage_CompressedColumnStoreValueAccessor

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/CollisionFreeVectorTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index e803954..d836014 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -43,33 +43,66 @@ namespace quickstep {
 CollisionFreeVectorTable::CollisionFreeVectorTable(
     const Type *key_type,
     const std::size_t num_entries,
-    const std::size_t memory_size,
-    const std::size_t num_init_partitions,
-    const std::size_t num_finalize_partitions,
-    const std::vector<std::size_t> &state_offsets,
     const std::vector<AggregationHandle *> &handles,
     StorageManager *storage_manager)
     : key_type_(key_type),
       num_entries_(num_entries),
       num_handles_(handles.size()),
       handles_(handles),
-      memory_size_(memory_size),
-      num_init_partitions_(num_init_partitions),
-      num_finalize_partitions_(num_finalize_partitions),
+      num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
       storage_manager_(storage_manager) {
   DCHECK_GT(num_entries, 0u);
-  DCHECK_GT(num_finalize_partitions_, 0u);
-  DCHECK_EQ(num_handles_, state_offsets.size());
+
+  std::size_t required_memory = 0;
+  const std::size_t existence_map_offset = 0;
+  std::vector<std::size_t> state_offsets;
+
+  required_memory += CacheLineAlignedBytes(
+      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        DCHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    state_offsets.emplace_back(required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
 
   const std::size_t num_storage_slots =
-      storage_manager_->SlotsNeededForBytes(memory_size_);
+      storage_manager_->SlotsNeededForBytes(required_memory);
 
   const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
   blob_ = storage_manager_->getBlobMutable(blob_id);
 
   void *memory_start = blob_->getMemoryMutable();
   existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start),
+      reinterpret_cast<char *>(memory_start) + existence_map_offset,
       num_entries,
       false /* initialize */));
 
@@ -78,6 +111,9 @@ CollisionFreeVectorTable::CollisionFreeVectorTable(
     vec_tables_.emplace_back(
         reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
   }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
 }
 
 CollisionFreeVectorTable::~CollisionFreeVectorTable() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 8e1342b..7a7d07e 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -58,12 +58,6 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
    *
    * @param key_type The group-by key type.
    * @param num_entries The estimated number of entries this table will hold.
-   * @param memory_size The memory size for this table.
-   * @param num_init_partitions The number of partitions to be used for
-   *        initializing the aggregation.
-   * @param num_finalize_partitions The number of partitions to be used for
-   *        finalizing the aggregation.
-   * @param state_offsets The offsets for each state in the table.
    * @param handles The aggregation handles.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold this table's contents).
@@ -71,10 +65,6 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
   CollisionFreeVectorTable(
       const Type *key_type,
       const std::size_t num_entries,
-      const std::size_t memory_size,
-      const std::size_t num_init_partitions,
-      const std::size_t num_finalize_partitions,
-      const std::vector<std::size_t> &state_offsets,
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager);
 
@@ -188,6 +178,32 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
   }
 
  private:
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+  }
+
+  inline static std::size_t CalculateNumInitializationPartitions(
+      const std::size_t memory_size) {
+    // Set initialization memory block size as 4MB.
+    constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u;
+
+    // At least 1 partition, at most 80 partitions.
+    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
+    // hardcoded 80.
+    return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
+  }
+
+  inline static std::size_t CalculateNumFinalizationPartitions(
+      const std::size_t num_entries) {
+    // Set finalization segment size as 4096 entries.
+    constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+    // At least 1 partition, at most 80 partitions.
+    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
+    // hardcoded 80.
+    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+  }
+
   inline std::size_t calculatePartitionLength() const {
     const std::size_t partition_length =
         (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
@@ -325,9 +341,9 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
   std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
   std::vector<void *> vec_tables_;
 
-  const std::size_t memory_size_;
-  const std::size_t num_init_partitions_;
-  const std::size_t num_finalize_partitions_;
+  std::size_t memory_size_;
+  std::size_t num_init_partitions_;
+  std::size_t num_finalize_partitions_;
 
   StorageManager *storage_manager_;
   MutableBlobReference blob_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7e8b33f8/storage/CompactKeySeparateChainingHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CompactKeySeparateChainingHashTable.cpp b/storage/CompactKeySeparateChainingHashTable.cpp
new file mode 100644
index 0000000..fd7a69f
--- /dev/null
+++ b/storage/CompactKeySeparateChainingHashTable.cpp
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CompactKeySeparateChainingHashTable.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+#include <type_traits>
+
+#include "cli/Flags.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/CharType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/PrimeNumber.hpp"
+#include "utility/Range.hpp"
+#include "utility/ScopedArray.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class AggregationHandle;
+
+namespace {
+
+template <typename Functor>
+inline auto InvokeOnKeySize(const std::size_t key_size,
+                            const Functor &functor) {
+  switch (key_size) {
+    case 0u:
+      break;
+    case 1u:
+      return functor(std::integral_constant<std::size_t, 1u>());
+    case 2u:
+      return functor(std::integral_constant<std::size_t, 2u>());
+    case 3u:
+      return functor(std::integral_constant<std::size_t, 3u>());
+    case 4u:
+      return functor(std::integral_constant<std::size_t, 4u>());
+    case 5u:
+      return functor(std::integral_constant<std::size_t, 5u>());
+    case 6u:
+      return functor(std::integral_constant<std::size_t, 6u>());
+    case 7u:
+      return functor(std::integral_constant<std::size_t, 7u>());
+    case 8u:
+      return functor(std::integral_constant<std::size_t, 8u>());
+    default:
+      break;
+  }
+  LOG(FATAL) << "Not supported";
+}
+
+}  // namespace
+
+CompactKeySeparateChainingHashTable::CompactKeySeparateChainingHashTable(
+    const std::vector<const Type*> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_types_(key_types),
+      buckets_allocated_(0) {
+  DCHECK_GT(num_entries, 0u);
+  if (!handles.empty()) {
+    LOG(FATAL) << "CompactKeySeparateChainingHashTable do not "
+               << "support aggregate functions yet";
+  }
+
+  num_slots_ = get_next_prime_number(num_entries * kHashTableLoadFactor);
+  num_key_buckets_ = num_slots_ / kHashTableLoadFactor;
+
+  // TODO: use storage manager for allocating memory.
+  slots_.reset(num_slots_);
+  key_buckets_.reset(num_key_buckets_);
+
+  std::size_t total_key_size = 0;
+  for (const Type *key_type : key_types) {
+    CHECK(!key_type->isNullable()) << "Not supported";
+    CHECK(!key_type->isVariableLength()) << "Not supported";
+
+    const std::size_t key_size = key_type->maximumByteLength();
+    key_sizes_.emplace_back(key_size);
+    total_key_size += key_size;
+  }
+
+  if (total_key_size > sizeof(KeyCode)) {
+    LOG(FATAL) << "Total key size exceeded KeyCode size";
+  }
+
+  slots_init_splitter_ = std::make_unique<RangeSplitter>(
+      RangeSplitter::CreateWithPartitionLength(
+          0, num_slots_, kInitMinPartitionLength, FLAGS_num_workers));
+
+  key_buckets_init_splitter_ = std::make_unique<RangeSplitter>(
+      RangeSplitter::CreateWithNumPartitions(
+          0, num_key_buckets_, slots_init_splitter_->getNumPartitions()));
+}
+
+bool CompactKeySeparateChainingHashTable::upsertValueAccessorCompositeKey(
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_attr_ids,
+    const ValueAccessorMultiplexer &accessor_mux) {
+  DCHECK(argument_ids.empty());
+  DCHECK_EQ(key_sizes_.size(), key_attr_ids.size());
+
+  const std::size_t num_tuples =
+      accessor_mux.getValueAccessorBySource(
+          key_attr_ids.front().source)->getNumTuplesVirtual();
+
+  ScopedArray<KeyCode> key_codes(num_tuples, true);
+
+  std::size_t offset = 0;
+  for (std::size_t i = 0; i < key_sizes_.size(); ++i) {
+    const MultiSourceAttributeId &key_attr = key_attr_ids[i];
+
+    InvokeOnAnyValueAccessor(
+        accessor_mux.getValueAccessorBySource(key_attr.source),
+        [&](auto *accessor) -> void {
+
+      InvokeOnKeySize(
+          key_sizes_[i],
+          [&](auto key_size) -> void {
+        this->constructCompactKeyCodeComponent(num_tuples,
+                                               offset,
+                                               decltype(key_size)::value,
+                                               accessor,
+                                               key_attr.attr_id,
+                                               key_codes.get());
+      });
+    });
+
+    offset += key_sizes_[i];
+  }
+
+  for (std::size_t i = 0; i < num_tuples; ++i) {
+    locateBucketInternal(key_codes[i]);
+  }
+
+  return true;
+}
+
+void CompactKeySeparateChainingHashTable::finalizeKeys(
+    const std::size_t partition_id,
+    ColumnVectorsValueAccessor *output) const {
+  DCHECK(final_splitter_ != nullptr);
+  const Range range = final_splitter_->getPartition(partition_id);
+
+  std::size_t offset = 0;
+  for (std::size_t i = 0; i < key_sizes_.size(); ++i) {
+    std::unique_ptr<NativeColumnVector> cv =
+        std::make_unique<NativeColumnVector>(*key_types_[i], range.size());
+
+    InvokeOnKeySize(
+        key_sizes_[i],
+        [&](auto key_size) -> void {
+      const char *key_ptr =
+          reinterpret_cast<const char*>(&key_buckets_[range.begin()].key_code) + offset;
+      for (std::size_t j = 0; j < range.size(); ++j) {
+        std::memcpy(cv->getPtrForDirectWrite(),
+                    key_ptr,
+                    decltype(key_size)::value);
+        key_ptr += kKeyBucketDataSize;
+      }
+    });
+
+    output->addColumn(ColumnVectorPtr(cv.release()));
+    offset += key_sizes_[i];
+  }
+}
+
+}  // namespace quickstep


Mime
View raw message