quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [05/13] incubator-quickstep git commit: Vector implementation aggregation.
Date Fri, 04 Nov 2016 17:48:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 8faf099..aef364c 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -39,6 +39,7 @@
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
+#include "storage/HashTableFactory.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
@@ -48,6 +49,7 @@
 #include "storage/SubBlocksReference.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
@@ -80,11 +82,20 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
-      is_aggregate_partitioned_(checkAggregatePartitioned(
-          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
+      is_aggregate_collision_free_(false),
+      is_aggregate_partitioned_(false),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
       storage_manager_(storage_manager) {
+  if (!group_by.empty()) {
+    if (hash_table_impl_type == HashTableImplType::kCollisionFreeColumnwise) {
+      is_aggregate_collision_free_ = true;
+    } else {
+      is_aggregate_partitioned_ = checkAggregatePartitioned(
+          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+    }
+  }
+
   // Sanity checks: each aggregate has a corresponding list of arguments.
   DCHECK(aggregate_functions.size() == arguments.size());
 
@@ -107,8 +118,6 @@ AggregationOperationState::AggregationOperationState(
     }
   }
 
-  std::vector<AggregationHandle *> group_by_handles;
-
   if (aggregate_functions.size() == 0) {
     // If there is no aggregation function, then it is a distinctify operation
     // on the group-by expressions.
@@ -119,9 +128,11 @@ AggregationOperationState::AggregationOperationState(
     group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                      hash_table_impl_type,
                                                      group_by_types_,
-                                                     handles_,
+                                                     {handles_.front().get()},
                                                      storage_manager));
   } else {
+    std::vector<AggregationHandle *> group_by_handles;
+
     // Set up each individual aggregate in this operation.
     std::vector<const AggregateFunction *>::const_iterator agg_func_it =
         aggregate_functions.begin();
@@ -168,7 +179,7 @@ AggregationOperationState::AggregationOperationState(
         if (*is_distinct_it) {
           LOG(FATAL) << "Distinct aggregation not supported";
         }
-        group_by_handles.emplace_back(handles_.back());
+        group_by_handles.emplace_back(handles_.back().get());
       } else {
         // Aggregation without GROUP BY: create a single global state.
         single_states_.emplace_back(handles_.back()->createInitialState());
@@ -177,13 +188,15 @@ AggregationOperationState::AggregationOperationState(
 
     // Aggregation with GROUP BY: create a HashTable pool.
     if (!group_by_key_ids_.empty()) {
-      if (!is_aggregate_partitioned_) {
-        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                         hash_table_impl_type,
-                                                         group_by_types_,
-                                                         group_by_handles,
-                                                         storage_manager));
-      } else {
+      if (is_aggregate_collision_free_) {
+        collision_free_hashtable_.reset(
+            AggregationStateHashTableFactory::CreateResizable(
+                hash_table_impl_type,
+                group_by_types_,
+                estimated_num_entries,
+                group_by_handles,
+                storage_manager));
+      } else if (is_aggregate_partitioned_) {
         partitioned_group_by_hashtable_pool_.reset(
             new PartitionedHashTablePool(estimated_num_entries,
                                          FLAGS_num_aggregation_partitions,
@@ -191,6 +204,12 @@ AggregationOperationState::AggregationOperationState(
                                          group_by_types_,
                                          group_by_handles,
                                          storage_manager));
+      } else {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types_,
+                                                         group_by_handles,
+                                                         storage_manager));
       }
     }
   }
@@ -323,33 +342,72 @@ bool AggregationOperationState::ProtoIsValid(
   return true;
 }
 
-void AggregationOperationState::aggregateBlock(const block_id input_block,
-                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
-  if (group_by_key_ids_.empty()) {
-    aggregateBlockSingleState(input_block);
+std::size_t AggregationOperationState::getNumPartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->getNumFinalizePartitions();
+  } else if (is_aggregate_partitioned_) {
+    return partitioned_group_by_hashtable_pool_->getNumPartitions();
+  } else  {
+    return 1u;
+  }
+}
+
+std::size_t AggregationOperationState::getNumInitializePartitions() const {
+  if (is_aggregate_collision_free_) {
+    return static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->getNumInitializePartitions();
   } else {
-    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+    return 0u;
   }
 }
 
-void AggregationOperationState::finalizeAggregate(
-    InsertDestination *output_destination) {
-  if (group_by_key_ids_.empty()) {
-    finalizeSingleState(output_destination);
+void AggregationOperationState::initializeState(const std::size_t partition_id) {
+  if (is_aggregate_collision_free_) {
+    static_cast<CollisionFreeAggregationStateHashTable *>(
+        collision_free_hashtable_.get())->initialize(partition_id);
   } else {
-    finalizeHashTable(output_destination);
+    LOG(FATAL) << "AggregationOperationState::initializeState() "
+               << "is not supported by this aggregation";
   }
 }
 
-void AggregationOperationState::mergeSingleState(
-    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
-  DEBUG_ASSERT(local_state.size() == single_states_.size());
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    if (!is_distinct_[agg_idx]) {
-      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
-                                     single_states_[agg_idx].get());
+bool AggregationOperationState::checkAggregatePartitioned(
+    const std::size_t estimated_num_groups,
+    const std::vector<bool> &is_distinct,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const std::vector<const AggregateFunction *> &aggregate_functions) const {
+  // If there's no aggregation, return false.
+  if (aggregate_functions.empty()) {
+    return false;
+  }
+  // Check if there's a distinct operation involved in any aggregate, if so
+  // the aggregate can't be partitioned.
+  for (auto distinct : is_distinct) {
+    if (distinct) {
+      return false;
     }
   }
+  // There's no distinct aggregation involved, Check if there's at least one
+  // GROUP BY operation.
+  if (group_by.empty()) {
+    return false;
+  }
+  // There are GROUP BYs without DISTINCT. Check if the estimated number of
+  // groups is large enough to warrant a partitioned aggregation.
+  return estimated_num_groups >
+         static_cast<std::size_t>(
+             FLAGS_partition_aggregation_num_groups_threshold);
+  return false;
+}
+
+void AggregationOperationState::aggregateBlock(const block_id input_block,
+                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
+  if (group_by_key_ids_.empty()) {
+    aggregateBlockSingleState(input_block);
+  } else {
+    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
+  }
 }
 
 void AggregationOperationState::aggregateBlockSingleState(
@@ -382,7 +440,7 @@ void AggregationOperationState::aggregateBlockSingleState(
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     const auto &argument_ids = argument_ids_[agg_idx];
-    const auto *handle = handles_[agg_idx];
+    const auto &handle = handles_[agg_idx];
 
     AggregationState *state;
     if (argument_ids.empty()) {
@@ -400,6 +458,24 @@ void AggregationOperationState::aggregateBlockSingleState(
   mergeSingleState(local_state);
 }
 
+void AggregationOperationState::mergeSingleState(
+    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
+  DEBUG_ASSERT(local_state.size() == single_states_.size());
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    if (!is_distinct_[agg_idx]) {
+      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
+                                     single_states_[agg_idx].get());
+    }
+  }
+}
+
+void AggregationOperationState::mergeGroupByHashTables(
+    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
+  HashTableMergerFast merger(dst);
+  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
+      ->forEach(&merger);
+}
+
 void AggregationOperationState::aggregateBlockHashTable(
     const block_id input_block,
     LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
@@ -438,60 +514,101 @@ void AggregationOperationState::aggregateBlockHashTable(
     }
   }
 
-  if (!is_aggregate_partitioned_) {
-    DCHECK(group_by_hashtable_pool_ != nullptr);
+  accessor->beginIterationVirtual();
+
+  // TODO
+  if (is_aggregate_collision_free_) {
+    aggregateBlockHashTableImplCollisionFree(
+        accessor, non_trivial_results.get());
+  } else if (is_aggregate_partitioned_) {
+    aggregateBlockHashTableImplPartitioned(
+        accessor, non_trivial_results.get());
+  } else {
+    aggregateBlockHashTableImplThreadPrivate(
+        accessor, non_trivial_results.get());
+  }
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(collision_free_hashtable_ != nullptr);
+
+  collision_free_hashtable_->upsertValueAccessor(argument_ids_,
+                                                 group_by_key_ids_,
+                                                 accessor,
+                                                 aux_accessor);
+}
+
+void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
+
+  InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    // TODO(jianqiao): handle the situation when keys in non_trivial_results
+    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+
+    // Compute the partitions for the tuple formed by group by values.
+    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+    partition_membership.resize(num_partitions);
+
+    // Create a tuple-id sequence for each partition.
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      partition_membership[partition].reset(
+          new TupleIdSequence(accessor->getEndPosition()));
+    }
+
+    // Iterate over ValueAccessor for each tuple,
+    // set a bit in the appropriate TupleIdSequence.
+    while (accessor->next()) {
+      // We need a unique_ptr because getTupleWithAttributes() uses "new".
+      std::unique_ptr<Tuple> curr_tuple(
+          accessor->getTupleWithAttributes(group_by_key_ids_));
+      const std::size_t curr_tuple_partition_id =
+          curr_tuple->getTupleHash() % num_partitions;
+      partition_membership[curr_tuple_partition_id]->set(
+          accessor->getCurrentPosition(), true);
+    }
+    // Aggregate each partition.
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      std::unique_ptr<ValueAccessor> adapter(
+          accessor->createSharedTupleIdSequenceAdapter(
+              *(partition_membership)[partition]));
+      partitioned_group_by_hashtable_pool_->getHashTable(partition)
+          ->upsertValueAccessor(argument_ids_,
+                                group_by_key_ids_,
+                                adapter.get(),
+                                aux_accessor);
+    }
+  });
+}
 
-    AggregationStateHashTableBase *agg_hash_table =
-        group_by_hashtable_pool_->getHashTable();
-    DCHECK(agg_hash_table != nullptr);
+void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
+    ValueAccessor *accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK(group_by_hashtable_pool_ != nullptr);
+
+  AggregationStateHashTableBase *agg_hash_table =
+      group_by_hashtable_pool_->getHashTable();
+
+  agg_hash_table->upsertValueAccessor(argument_ids_,
+                                      group_by_key_ids_,
+                                      accessor,
+                                      aux_accessor);
+  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+}
 
-    std::cout << "Upsert VA start\n";
-    std::flush(std::cout);
-    accessor->beginIterationVirtual();
-    agg_hash_table->upsertValueAccessor(argument_ids_,
-                                        group_by_key_ids_,
-                                        accessor,
-                                        non_trivial_results.get());
-    std::cout << "Aggregate done\n";
-    std::flush(std::cout);
-    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+void AggregationOperationState::finalizeAggregate(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (group_by_key_ids_.empty()) {
+    DCHECK_EQ(0u, partition_id);
+    finalizeSingleState(output_destination);
   } else {
-    LOG(FATAL) << "Partitioned aggregation not supported";
-//    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
-//
-//    // Compute the partitions for the tuple formed by group by values.
-//    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
-//    partition_membership.resize(num_partitions);
-//
-//    // Create a tuple-id sequence for each partition.
-//    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-//      partition_membership[partition].reset(
-//          new TupleIdSequence(temp_result.getEndPosition()));
-//    }
-//
-//    // Iterate over ValueAccessor for each tuple,
-//    // set a bit in the appropriate TupleIdSequence.
-//    temp_result.beginIteration();
-//    while (temp_result.next()) {
-//      // We need a unique_ptr because getTupleWithAttributes() uses "new".
-//      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
-//      const std::size_t curr_tuple_partition_id =
-//          curr_tuple->getTupleHash() % num_partitions;
-//      partition_membership[curr_tuple_partition_id]->set(
-//          temp_result.getCurrentPosition(), true);
-//    }
-//    // For each partition, create an adapter around Value Accessor and
-//    // TupleIdSequence.
-//    std::vector<std::unique_ptr<
-//        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
-//    adapter.resize(num_partitions);
-//    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
-//      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
-//          *(partition_membership)[partition]));
-//      partitioned_group_by_hashtable_pool_->getHashTable(partition)
-//          ->upsertValueAccessorCompositeKeyFast(
-//              argument_ids, adapter[partition].get(), key_ids, true);
-//    }
+    finalizeHashTable(partition_id, output_destination);
   }
 }
 
@@ -509,51 +626,79 @@ void AggregationOperationState::finalizeSingleState(
   output_destination->insertTuple(Tuple(std::move(attribute_values)));
 }
 
-void AggregationOperationState::mergeGroupByHashTables(
-    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
-  HashTableMergerFast merger(dst);
-  static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
-      ->forEach(&merger);
+void AggregationOperationState::finalizeHashTable(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  if (is_aggregate_collision_free_) {
+    finalizeHashTableImplCollisionFree(partition_id, output_destination);
+  } else if (is_aggregate_partitioned_) {
+    finalizeHashTableImplPartitioned(partition_id, output_destination);
+  } else {
+    DCHECK_EQ(0u, partition_id);
+    finalizeHashTableImplThreadPrivate(output_destination);
+  }
 }
 
-void AggregationOperationState::finalizeHashTable(
+void AggregationOperationState::finalizeHashTableImplCollisionFree(
+    const std::size_t partition_id,
     InsertDestination *output_destination) {
-  // Each element of 'group_by_keys' is a vector of values for a particular
-  // group (which is also the prefix of the finalized Tuple for that group).
-  std::vector<std::vector<TypedValue>> group_by_keys;
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  CollisionFreeAggregationStateHashTable *hash_table =
+      static_cast<CollisionFreeAggregationStateHashTable *>(
+          collision_free_hashtable_.get());
 
-  // TODO(harshad) - The merge phase may be slower when each hash table contains
-  // large number of entries. We should find ways in which we can perform a
-  // parallel merge.
+  // TODO
+  const std::size_t max_length =
+      hash_table->getNumTuplesInPartition(partition_id);
+  ColumnVectorsValueAccessor complete_result;
 
-  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
-  // e.g. Keep merging entries from smaller hash tables to larger.
+  DCHECK_EQ(1u, group_by_types_.size());
+  const Type *key_type = group_by_types_.front();
+  DCHECK(NativeColumnVector::UsableForType(*key_type));
 
-  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
-  DCHECK(hash_tables != nullptr);
-  if (hash_tables->empty()) {
-    return;
-  }
+  std::unique_ptr<NativeColumnVector> key_cv(
+      new NativeColumnVector(*key_type, max_length));
+  hash_table->finalizeKey(partition_id, key_cv.get());
+  complete_result.addColumn(key_cv.release());
 
-  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
-      hash_tables->back().release());
-  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
-    std::unique_ptr<AggregationStateHashTableBase> hash_table(
-        hash_tables->at(i).release());
-    mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
-    hash_table->destroyPayload();
+  for (std::size_t i = 0; i < handles_.size(); ++i) {
+    if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
+      DCHECK_EQ(1u, handles_.size());
+      break;
+    }
+
+    const Type *result_type = handles_[i]->getResultType();
+    DCHECK(NativeColumnVector::UsableForType(*result_type));
+
+    std::unique_ptr<NativeColumnVector> result_cv(
+        new NativeColumnVector(*result_type, max_length));
+    hash_table->finalizeState(partition_id, i, result_cv.get());
+    complete_result.addColumn(result_cv.release());
   }
 
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
+void AggregationOperationState::finalizeHashTableImplPartitioned(
+    const std::size_t partition_id,
+    InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
+  AggregationStateHashTableBase *hash_table =
+      partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *final_hash_table, &group_by_keys, agg_idx);
+        *hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
-  final_hash_table->destroyPayload();
+  hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -570,16 +715,14 @@ void AggregationOperationState::finalizeHashTable(
           new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     } else {
       IndirectColumnVector *element_cv =
           new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(
-            std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;
@@ -599,24 +742,44 @@ void AggregationOperationState::finalizeHashTable(
   output_destination->bulkInsertTuples(&complete_result);
 }
 
-void AggregationOperationState::finalizeAggregatePartitioned(
-    const std::size_t partition_id, InsertDestination *output_destination) {
+void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+    InsertDestination *output_destination) {
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
+  // TODO(harshad) - The merge phase may be slower when each hash table contains
+  // large number of entries. We should find ways in which we can perform a
+  // parallel merge.
+
+  // TODO(harshad) - Find heuristics for faster merge, even in a single thread.
+  // e.g. Keep merging entries from smaller hash tables to larger.
+
+  auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+  DCHECK(hash_tables != nullptr);
+  if (hash_tables->empty()) {
+    return;
+  }
+
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+      hash_tables->back().release());
+  for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+    std::unique_ptr<AggregationStateHashTableBase> hash_table(
+        hash_tables->at(i).release());
+    mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+    hash_table->destroyPayload();
+  }
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  AggregationStateHashTableBase *hash_table =
-      partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *hash_table, &group_by_keys, agg_idx);
+        *final_hash_table, &group_by_keys, agg_idx);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
   }
-  hash_table->destroyPayload();
+  final_hash_table->destroyPayload();
 
   // Reorganize 'group_by_keys' in column-major order so that we can make a
   // ColumnVectorsValueAccessor to bulk-insert results.
@@ -633,14 +796,16 @@ void AggregationOperationState::finalizeAggregatePartitioned(
           new NativeColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     } else {
       IndirectColumnVector *element_cv =
           new IndirectColumnVector(*group_by_type, group_by_keys.size());
       group_by_cvs.emplace_back(element_cv);
       for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+        element_cv->appendTypedValue(
+            std::move(group_key[group_by_element_idx]));
       }
     }
     ++group_by_element_idx;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0f3807e..233256c 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -44,6 +44,7 @@ namespace quickstep {
 class AggregateFunction;
 class CatalogDatabaseLite;
 class CatalogRelationSchema;
+class ColumnVectorsValueAccessor;
 class InsertDestination;
 class LIPFilterAdaptiveProber;
 class StorageManager;
@@ -174,101 +175,77 @@ class AggregationOperationState {
    * @brief Generate the final results for the aggregates managed by this
    *        AggregationOperationState and write them out to StorageBlock(s).
    *
+   * @param partition_id The partition id of this finalize operation.
    * @param output_destination An InsertDestination where the finalized output
    *        tuple(s) from this aggregate are to be written.
    **/
-  void finalizeAggregate(InsertDestination *output_destination);
-
-  /**
-   * @brief Generate the final results for the aggregates managed by this
-   *        AggregationOperationState and write them out to StorageBlock(s).
-   *        In this implementation, each thread picks a hash table belonging to
-   *        a partition and writes its values to StorageBlock(s). There is no
-   *        need to merge multiple hash tables in one, because there is no
-   *        overlap in the keys across two hash tables.
-   *
-   * @param partition_id The ID of the partition for which finalize is being
-   *        performed.
-   * @param output_destination An InsertDestination where the finalized output
-   *        tuple(s) from this aggregate are to be written.
-   **/
-  void finalizeAggregatePartitioned(
-      const std::size_t partition_id, InsertDestination *output_destination);
-
-  static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
-                                     AggregationStateHashTableBase *dst);
-
-  bool isAggregatePartitioned() const {
-    return is_aggregate_partitioned_;
-  }
+  void finalizeAggregate(const std::size_t partition_id,
+                         InsertDestination *output_destination);
 
   /**
    * @brief Get the number of partitions to be used for the aggregation.
    *        For non-partitioned aggregations, we return 1.
    **/
-  std::size_t getNumPartitions() const {
-    return is_aggregate_partitioned_
-               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
-               : 1;
-  }
+  std::size_t getNumPartitions() const;
 
-  int dflag;
+  std::size_t getNumInitializePartitions() const;
+
+  void initializeState(const std::size_t partition_id);
 
  private:
-  // Merge locally (per storage block) aggregated states with global aggregation
-  // states.
-  void mergeSingleState(
-      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const;
 
   // Aggregate on input block.
   void aggregateBlockSingleState(const block_id input_block);
   void aggregateBlockHashTable(const block_id input_block,
                                LIPFilterAdaptiveProber *lip_filter_adaptive_prober);
 
-  void finalizeSingleState(InsertDestination *output_destination);
-  void finalizeHashTable(InsertDestination *output_destination);
+  // Merge locally (per storage block) aggregated states with global aggregation
+  // states.
+  void mergeSingleState(
+      const std::vector<std::unique_ptr<AggregationState>> &local_state);
+  void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+                              AggregationStateHashTableBase *dst) const;
 
-  bool checkAggregatePartitioned(
-      const std::size_t estimated_num_groups,
-      const std::vector<bool> &is_distinct,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const std::vector<const AggregateFunction *> &aggregate_functions) const {
-//    // If there's no aggregation, return false.
-//    if (aggregate_functions.empty()) {
-//      return false;
-//    }
-//    // Check if there's a distinct operation involved in any aggregate, if so
-//    // the aggregate can't be partitioned.
-//    for (auto distinct : is_distinct) {
-//      if (distinct) {
-//        return false;
-//      }
-//    }
-//    // There's no distinct aggregation involved, Check if there's at least one
-//    // GROUP BY operation.
-//    if (group_by.empty()) {
-//      return false;
-//    }
-//    // There are GROUP BYs without DISTINCT. Check if the estimated number of
-//    // groups is large enough to warrant a partitioned aggregation.
-//    return estimated_num_groups >
-//           static_cast<std::size_t>(
-//               FLAGS_partition_aggregation_num_groups_threshold);
-    return false;
-  }
+  // Finalize the aggregation results into output_destination.
+  void finalizeSingleState(InsertDestination *output_destination);
+  void finalizeHashTable(const std::size_t partition_id,
+                         InsertDestination *output_destination);
+
+  // Specialized implementations for aggregateBlockHashTable.
+  void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor,
+                                              ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor,
+                                                ColumnVectorsValueAccessor *aux_accessor);
+
+  // Specialized implementations for finalizeHashTable.
+  void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
+                                          InsertDestination *output_destination);
+  void finalizeHashTableImplPartitioned(const std::size_t partition_id,
+                                        InsertDestination *output_destination);
+  void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
 
+  // Whether the aggregation is collision free or not.
+  bool is_aggregate_collision_free_;
+
   // Whether the aggregation is partitioned or not.
-  const bool is_aggregate_partitioned_;
+  bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
 
   // Each individual aggregate in this operation has an AggregationHandle and
   // zero (indicated by -1) or one argument.
-  std::vector<AggregationHandle *> handles_;
+  std::vector<std::unique_ptr<AggregationHandle>> handles_;
 
   // For each aggregate, whether DISTINCT should be applied to the aggregate's
   // arguments.
@@ -301,6 +278,8 @@ class AggregationOperationState {
 
   std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
 
+  std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 2709d9d..10ac2af 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -169,6 +169,9 @@ if(QUICKSTEP_HAVE_BITWEAVING)
               bitweaving/BitWeavingVIndexSubBlock.hpp)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+add_library(quickstep_storage_CollisionFreeAggregationStateHashTable
+            CollisionFreeAggregationStateHashTable.cpp
+            CollisionFreeAggregationStateHashTable.hpp)
 add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp)
 add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp)
 add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -287,6 +290,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_scalar_Scalar
                       quickstep_storage_AggregationOperationState_proto
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
                       quickstep_storage_PackedPayloadAggregationStateHashTable
@@ -297,6 +301,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
@@ -439,6 +444,24 @@ if(QUICKSTEP_HAVE_BITWEAVING)
                         quickstep_utility_Macros)
 endif()
 # CMAKE_VALIDATE_IGNORE_END
+target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_HashTableBase
+                      quickstep_storage_StorageBlob
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_StorageManager
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ConcurrentBitVector
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -697,6 +720,7 @@ target_link_libraries(quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
 target_link_libraries(quickstep_storage_HashTableFactory
                       glog
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_HashTable
                       quickstep_storage_HashTable_proto
                       quickstep_storage_HashTableBase
@@ -1125,6 +1149,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
                       quickstep_storage_CSBTreeIndexSubBlock
+                      quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_ColumnStoreUtil
                       quickstep_storage_CompressedBlockBuilder
                       quickstep_storage_CompressedColumnStoreTupleStorageSubBlock
@@ -1134,6 +1159,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_CompressedStoreUtil
                       quickstep_storage_CompressedTupleStorageSubBlock
                       quickstep_storage_CountedReference
+                      quickstep_storage_DirectAddressingAggregationStateHashTable
                       quickstep_storage_EvictionPolicy
                       quickstep_storage_FileManager
                       quickstep_storage_FileManagerLocal

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
new file mode 100644
index 0000000..8c9be7b
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/CollisionFreeAggregationStateHashTable.hpp"
+
+#include <algorithm>
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+namespace quickstep {
+
+CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable(
+    const std::vector<const Type *> &key_types,
+    const std::size_t num_entries,
+    const std::vector<AggregationHandle *> &handles,
+    StorageManager *storage_manager)
+    : key_type_(key_types.front()),
+      num_entries_(num_entries),
+      num_handles_(handles.size()),
+      handles_(handles),
+      num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)),
+      storage_manager_(storage_manager) {
+  CHECK_EQ(1u, key_types.size());
+  DCHECK_GT(num_entries, 0u);
+
+  std::map<std::string, std::size_t> memory_offsets;
+  std::size_t required_memory = 0;
+
+  memory_offsets.emplace("existence_map", required_memory);
+  required_memory +=
+      CacheLineAlignedBytes(ConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
+
+    std::size_t state_size = 0;
+    switch (handle->getAggregationID()) {
+      case AggregationID::kCount: {
+        state_size = sizeof(std::atomic<std::size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        CHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:  // Fall through
+          case TypeID::kLong:
+            state_size = sizeof(std::atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:  // Fall through
+          case TypeID::kDouble:
+            state_size = sizeof(std::atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "Not implemented";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Not implemented";
+    }
+
+    memory_offsets.emplace(std::string("state") + std::to_string(i),
+                           required_memory);
+    required_memory += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  const std::size_t num_storage_slots =
+      storage_manager_->SlotsNeededForBytes(required_memory);
+
+  const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
+  blob_ = storage_manager_->getBlobMutable(blob_id);
+
+  void *memory_start = blob_->getMemoryMutable();
+  existence_map_.reset(new ConcurrentBitVector(
+      reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"),
+      num_entries));
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    vec_tables_.emplace_back(
+        reinterpret_cast<char *>(memory_start) +
+            memory_offsets.at(std::string("state") + std::to_string(i)));
+  }
+
+  memory_size_ = required_memory;
+  num_init_partitions_ = std::min(memory_size_ / (4uL * 1024 * 1024), 80uL);
+}
+
+CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {
+  const block_id blob_id = blob_->getID();
+  blob_.release();
+  storage_manager_->deleteBlockOrBlobFile(blob_id);
+}
+
+void CollisionFreeAggregationStateHashTable::destroyPayload() {
+}
+
+bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
+    const std::vector<std::vector<attribute_id>> &argument_ids,
+    const std::vector<attribute_id> &key_attr_ids,
+    ValueAccessor *base_accessor,
+    ColumnVectorsValueAccessor *aux_accessor) {
+  DCHECK_EQ(1u, key_attr_ids.size());
+
+  const attribute_id key_attr_id = key_attr_ids.front();
+  const bool is_key_nullable = key_type_->isNullable();
+
+  // TODO: aux_accessor
+  CHECK_GE(key_attr_id, 0);
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const attribute_id argument_id =
+        argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front();
+
+    // TODO: aux_accessor
+    CHECK_GE(argument_id, 0u);
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+
+    const Type *argument_type;
+    bool is_argument_nullable;
+    if (argument_types.empty()) {
+      argument_type = nullptr;
+      is_argument_nullable = false;
+    } else {
+      argument_type = argument_types.front();
+      is_argument_nullable = argument_type->isNullable();
+    }
+
+    // TODO: aux_accessor
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        base_accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      upsertValueAccessorDispatchHelper(is_key_nullable,
+                                        is_argument_nullable,
+                                        key_type_,
+                                        argument_type,
+                                        handle->getAggregationID(),
+                                        key_attr_id,
+                                        argument_id,
+                                        vec_tables_[i],
+                                        accessor);
+    });
+  }
+  return true;
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeKey(
+    const std::size_t partition_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  switch (key_type_->getTypeID()) {
+    case TypeID::kInt:
+      finalizeKeyInternal<int>(start_position, end_position, output_cv);
+      return;
+    case TypeID::kLong:
+      finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+void CollisionFreeAggregationStateHashTable::finalizeState(
+    const std::size_t partition_id,
+    std::size_t handle_id,
+    NativeColumnVector *output_cv) const {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const AggregationHandle *handle = handles_[handle_id];
+  const auto &argument_types = handle->getArgumentTypes();
+  const Type *argument_type =
+      argument_types.empty() ? nullptr : argument_types.front();
+
+  finalizeStateDispatchHelper(handle->getAggregationID(),
+                              argument_type,
+                              vec_tables_[handle_id],
+                              start_position,
+                              end_position,
+                              output_cv);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp
new file mode 100644
index 0000000..0fa539b
--- /dev/null
+++ b/storage/CollisionFreeAggregationStateHashTable.hpp
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/ConcurrentBitVector.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageMnager;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+class CollisionFreeAggregationStateHashTable
+    : public AggregationStateHashTableBase {
+ public:
+  CollisionFreeAggregationStateHashTable(
+      const std::vector<const Type *> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  ~CollisionFreeAggregationStateHashTable() override;
+
+  void destroyPayload() override;
+
+  inline std::size_t getNumInitializePartitions() const {
+    return num_init_partitions_;
+  }
+
+  inline std::size_t getNumFinalizePartitions() const {
+    return num_finalize_partitions_;
+  }
+
+  inline std::size_t getNumTuplesInPartition(
+      const std::size_t partition_id) const {
+    const std::size_t start_position =
+        calculatePartitionStartPosition(partition_id);
+    const std::size_t end_position =
+        calculatePartitionEndPosition(partition_id);
+    return existence_map_->onesCount(start_position, end_position);
+  }
+
+  inline void initialize(const std::size_t partition_id) {
+    const std::size_t memory_segment_size =
+        (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_;
+    const std::size_t memory_start = memory_segment_size * partition_id;
+    std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start,
+                0,
+                std::min(memory_segment_size, memory_size_ - memory_start));
+  }
+
+  bool upsertValueAccessor(
+      const std::vector<std::vector<attribute_id>> &argument_ids,
+      const std::vector<attribute_id> &key_attr_ids,
+      ValueAccessor *base_accessor,
+      ColumnVectorsValueAccessor *aux_accessor = nullptr) override;
+
+  void finalizeKey(const std::size_t partition_id,
+                   NativeColumnVector *output_cv) const;
+
+  void finalizeState(const std::size_t partition_id,
+                     std::size_t handle_id,
+                     NativeColumnVector *output_cv) const;
+
+ private:
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+  }
+
+  inline std::size_t calculatePartitionLength() const {
+    const std::size_t partition_length =
+        (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
+    DCHECK_GE(partition_length, 0u);
+    return partition_length;
+  }
+
+  inline std::size_t calculatePartitionStartPosition(
+      const std::size_t partition_id) const {
+    return calculatePartitionLength() * partition_id;
+  }
+
+  inline std::size_t calculatePartitionEndPosition(
+      const std::size_t partition_id) const {
+    return std::min(calculatePartitionLength() * (partition_id + 1),
+                    num_entries_);
+  }
+
+  template <typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const bool is_key_nullable,
+      const bool is_argument_nullable,
+      ArgTypes &&...args);
+
+  template <bool is_key_nullable, bool is_value_nullable, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *key_type,
+      ArgTypes &&...args);
+
+  template <bool is_key_nullable, bool is_value_nullable,
+            typename KeyT, typename ...ArgTypes>
+  inline void upsertValueAccessorDispatchHelper(
+      const Type *argument_type,
+      const AggregationID agg_id,
+      ArgTypes &&...args);
+
+  template <bool is_key_nullable, bool is_value_nullable,
+            typename KeyT, typename ValueAccessorT>
+  inline void upsertValueAccessorCountHelper(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      ValueAccessorT *accessor);
+
+  template <bool is_key_nullable, bool is_value_nullable,
+            typename KeyT, typename ValueAccessorT>
+  inline void upsertValueAccessorSumHelper(
+      const Type *argument_type,
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      void *vec_table,
+      ValueAccessorT *accessor);
+
+  template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+  inline void upsertValueAccessorCountNullary(
+      const attribute_id key_attr_id,
+      std::atomic<std::size_t> *vec_table,
+      ValueAccessorT *accessor);
+
+  template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+  inline void upsertValueAccessorCountUnary(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<std::size_t> *vec_table,
+      ValueAccessorT *accessor);
+
+  template <bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename ValueAccessorT>
+  inline void upsertValueAccessorIntegerSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      ValueAccessorT *accessor);
+
+  template <bool is_key_nullable, bool is_argument_nullable,
+            typename KeyT, typename ArgumentT, typename StateT,
+            typename ValueAccessorT>
+  inline void upsertValueAccessorGenericSum(
+      const attribute_id key_attr_id,
+      const attribute_id argument_id,
+      std::atomic<StateT> *vec_table,
+      ValueAccessorT *accessor);
+
+  template <typename KeyT>
+  inline void finalizeKeyInternal(const std::size_t start_position,
+                                  const std::size_t end_position,
+                                  NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc;
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateDispatchHelper(
+      const AggregationID agg_id,
+      const Type *argument_type,
+      const void *vec_table,
+      ArgTypes &&...args) const {
+    switch (agg_id) {
+       case AggregationID::kCount:
+         finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table),
+                            std::forward<ArgTypes>(args)...);
+         return;
+       case AggregationID::kSum:
+         finalizeStateSumHelper(argument_type,
+                                vec_table,
+                                std::forward<ArgTypes>(args)...);
+         return;
+       default:
+         LOG(FATAL) << "Not supported";
+    }
+  }
+
+  template <typename ...ArgTypes>
+  inline void finalizeStateSumHelper(
+      const Type *argument_type,
+      const void *vec_table,
+      ArgTypes &&...args) const {
+    DCHECK(argument_type != nullptr);
+
+    switch (argument_type->getTypeID()) {
+      case TypeID::kInt:    // Fall through
+      case TypeID::kLong:
+        finalizeStateSum<std::int64_t>(
+            static_cast<const std::atomic<std::int64_t> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      case TypeID::kFloat:  // Fall through
+      case TypeID::kDouble:
+        finalizeStateSum<double>(
+            static_cast<const std::atomic<double> *>(vec_table),
+            std::forward<ArgTypes>(args)...);
+        return;
+      default:
+        LOG(FATAL) << "Not supported";
+    }
+  }
+
+  inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table,
+                                 const std::size_t start_position,
+                                 const std::size_t end_position,
+                                 NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  template <typename ResultT, typename StateT>
+  inline void finalizeStateSum(const std::atomic<StateT> *vec_table,
+                               const std::size_t start_position,
+                               const std::size_t end_position,
+                               NativeColumnVector *output_cv) const {
+    std::size_t loc = start_position - 1;
+    while ((loc = existence_map_->nextOne(loc)) < end_position) {
+      *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) =
+          vec_table[loc].load(std::memory_order_relaxed);
+    }
+  }
+
+  const Type *key_type_;
+  const std::size_t num_entries_;
+
+  const std::size_t num_handles_;
+  const std::vector<AggregationHandle *> handles_;
+
+  std::unique_ptr<ConcurrentBitVector> existence_map_;
+  std::vector<void *> vec_tables_;
+
+  const std::size_t num_finalize_partitions_;
+
+  StorageManager *storage_manager_;
+  MutableBlobReference blob_;
+
+  std::size_t memory_size_;
+  std::size_t num_init_partitions_;
+
+  DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable);
+};
+
+// ----------------------------------------------------------------------------
+// Implementations of template methods follow.
+
+template <typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const bool is_key_nullable,
+        const bool is_argument_nullable,
+        ArgTypes &&...args) {
+  if (is_key_nullable) {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<true, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<true, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  } else {
+    if (is_argument_nullable) {
+      upsertValueAccessorDispatchHelper<false, true>(
+          std::forward<ArgTypes>(args)...);
+    } else {
+      upsertValueAccessorDispatchHelper<false, false>(
+          std::forward<ArgTypes>(args)...);
+    }
+  }
+}
+
+template <bool is_key_nullable, bool is_value_nullable, typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const Type *key_type,
+        ArgTypes &&...args) {
+  switch (key_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorDispatchHelper<
+          is_key_nullable, is_value_nullable, int>(
+              std::forward<ArgTypes>(args)...);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorDispatchHelper<
+          is_key_nullable, is_value_nullable, std::int64_t>(
+              std::forward<ArgTypes>(args)...);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, bool is_value_nullable,
+          typename KeyT, typename ...ArgTypes>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorDispatchHelper(
+        const Type *argument_type,
+        const AggregationID agg_id,
+        ArgTypes &&...args) {
+  switch (agg_id) {
+     case AggregationID::kCount:
+       upsertValueAccessorCountHelper<
+           is_key_nullable, is_value_nullable, KeyT>(
+               std::forward<ArgTypes>(args)...);
+       return;
+     case AggregationID::kSum:
+       upsertValueAccessorSumHelper<
+           is_key_nullable, is_value_nullable, KeyT>(
+               argument_type, std::forward<ArgTypes>(args)...);
+       return;
+     default:
+       LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, bool is_value_nullable,
+          typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountHelper(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        void *vec_table,
+        ValueAccessorT *accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+
+  if (is_value_nullable && argument_id != kInvalidAttributeID) {
+    upsertValueAccessorCountUnary<is_key_nullable, KeyT>(
+        key_attr_id,
+        argument_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        accessor);
+    return;
+  } else {
+    upsertValueAccessorCountNullary<is_key_nullable, KeyT>(
+        key_attr_id,
+        static_cast<std::atomic<std::size_t> *>(vec_table),
+        accessor);
+    return;
+  }
+}
+
+template <bool is_key_nullable, bool is_value_nullable,
+          typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorSumHelper(
+        const Type *argument_type,
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        void *vec_table,
+        ValueAccessorT *accessor) {
+  DCHECK_GE(key_attr_id, 0u);
+  DCHECK_GE(argument_id, 0u);
+  DCHECK(argument_type != nullptr);
+
+  switch (argument_type->getTypeID()) {
+    case TypeID::kInt:
+      upsertValueAccessorIntegerSum<
+          is_key_nullable, is_value_nullable, KeyT, int>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              accessor);
+      return;
+    case TypeID::kLong:
+      upsertValueAccessorIntegerSum<
+          is_key_nullable, is_value_nullable, KeyT, std::int64_t>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<std::int64_t> *>(vec_table),
+              accessor);
+      return;
+    case TypeID::kFloat:
+      upsertValueAccessorGenericSum<
+          is_key_nullable, is_value_nullable, KeyT, float>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              accessor);
+      return;
+    case TypeID::kDouble:
+      upsertValueAccessorGenericSum<
+          is_key_nullable, is_value_nullable, KeyT, double>(
+              key_attr_id,
+              argument_id,
+              static_cast<std::atomic<double> *>(vec_table),
+              accessor);
+      return;
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountNullary(
+        const attribute_id key_attr_id,
+        std::atomic<std::size_t> *vec_table,
+        ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+    existence_map_->setBit(loc);
+  }
+}
+
+template <bool is_key_nullable, typename KeyT, typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorCountUnary(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<std::size_t> *vec_table,
+        ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    if (accessor->getUntypedValue(argument_id) == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(1u, std::memory_order_relaxed);
+  }
+}
+
+template <bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorIntegerSum(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<StateT> *vec_table,
+        ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    vec_table[loc].fetch_add(*argument, std::memory_order_relaxed);
+  }
+}
+
+template <bool is_key_nullable, bool is_argument_nullable,
+          typename KeyT, typename ArgumentT, typename StateT,
+          typename ValueAccessorT>
+inline void CollisionFreeAggregationStateHashTable
+    ::upsertValueAccessorGenericSum(
+        const attribute_id key_attr_id,
+        const attribute_id argument_id,
+        std::atomic<StateT> *vec_table,
+        ValueAccessorT *accessor) {
+  accessor->beginIteration();
+  while (accessor->next()) {
+    const KeyT *key = static_cast<const KeyT *>(
+        accessor->template getUntypedValue<is_key_nullable>(key_attr_id));
+    if (is_key_nullable && key == nullptr) {
+      continue;
+    }
+    const std::size_t loc = *key;
+    existence_map_->setBit(loc);
+    const ArgumentT *argument = static_cast<const ArgumentT *>(
+        accessor->template getUntypedValue<is_argument_nullable>(argument_id));
+    if (is_argument_nullable && argument == nullptr) {
+      continue;
+    }
+    const ArgumentT arg_val = *argument;
+    std::atomic<StateT> &state = vec_table[loc];
+    StateT state_val = state.load(std::memory_order_relaxed);
+    while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {}
+  }
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 1d4ccb0..87d21b2 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -22,9 +22,10 @@ package quickstep.serialization;
 import "types/Type.proto";
 
 enum HashTableImplType {
-  LINEAR_OPEN_ADDRESSING = 0;
-  SEPARATE_CHAINING = 1;
-  SIMPLE_SCALAR_SEPARATE_CHAINING = 2;
+  COLLISION_FREE_COLUMNWISE = 0;
+  LINEAR_OPEN_ADDRESSING = 1;
+  SEPARATE_CHAINING = 2;
+  SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index edea63b..d418105 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -40,6 +40,7 @@ class ValueAccessor;
  *        HashTableFactory to create a HashTable.
  **/
 enum class HashTableImplType {
+  kCollisionFreeColumnwise,
   kLinearOpenAddressing,
   kSeparateChaining,
   kSimpleScalarSeparateChaining

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 16d13c8..3dc14ca 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "storage/CollisionFreeAggregationStateHashTable.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTable.pb.h"
@@ -114,6 +115,8 @@ serialization::HashTableImplType SimplifyHashTableImplTypeProto(
 inline HashTableImplType HashTableImplTypeFromProto(
     const serialization::HashTableImplType proto_type) {
   switch (proto_type) {
+    case serialization::HashTableImplType::COLLISION_FREE_COLUMNWISE:
+      return HashTableImplType::kCollisionFreeColumnwise;
     case serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING:
       return HashTableImplType::kLinearOpenAddressing;
     case serialization::HashTableImplType::SEPARATE_CHAINING:
@@ -345,6 +348,9 @@ class AggregationStateHashTableFactory {
       case HashTableImplType::kSeparateChaining:
         return new PackedPayloadSeparateChainingAggregationStateHashTable(
             key_types, num_entries, handles, storage_manager);
+      case HashTableImplType::kCollisionFreeColumnwise:
+        return new CollisionFreeAggregationStateHashTable(
+            key_types, num_entries, handles, storage_manager);
       default: {
         LOG(FATAL) << "Unrecognized HashTableImplType in HashTableFactory::createResizable()\n";
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/storage/TupleIdSequence.hpp
----------------------------------------------------------------------
diff --git a/storage/TupleIdSequence.hpp b/storage/TupleIdSequence.hpp
index 5047270..6097715 100644
--- a/storage/TupleIdSequence.hpp
+++ b/storage/TupleIdSequence.hpp
@@ -272,6 +272,16 @@ class TupleIdSequence {
   }
 
   /**
+   * @brief Get an iterator one before the given tuple ID.
+   * @warning The iterator returned must be incremented to become valid.
+   *
+   * @return An iterator one before the given tuple ID in this sequence.
+   **/
+  inline const_iterator before_position(const tuple_id position) const {
+    return const_iterator(&internal_bitvector_, position - 1);
+  }
+
+  /**
    * @brief Get an iterator one-past-the-end of tuple IDs actually in this
    *        sequence.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 872225e..1c382db 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -170,9 +170,11 @@ add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
 add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
+add_library(quickstep_utility_ConcurrentBitVector ../empty_src.cpp ConcurrentBitVector.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
+add_library(quickstep_utility_EventProfiler EventProfiler.cpp EventProfiler.hpp)
 add_library(quickstep_utility_ExecutionDAGVisualizer
             ExecutionDAGVisualizer.cpp
             ExecutionDAGVisualizer.hpp)
@@ -235,11 +237,17 @@ target_link_libraries(quickstep_utility_CompositeHash
                       quickstep_types_TypedValue
                       quickstep_utility_HashPair
                       glog)
+target_link_libraries(quickstep_utility_ConcurrentBitVector
+                      glog
+                      quickstep_utility_BitManipulation
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_DisjointTreeForest
                       glog)
+target_link_libraries(quickstep_utility_EventProfiler
+                      quickstep_threading_Mutex)
 target_link_libraries(quickstep_utility_ExecutionDAGVisualizer
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -332,9 +340,11 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
                       quickstep_utility_CompositeHash
+                      quickstep_utility_ConcurrentBitVector
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant
+                      quickstep_utility_EventProfiler
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob
                       quickstep_utility_HashPair

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/ConcurrentBitVector.hpp
----------------------------------------------------------------------
diff --git a/utility/ConcurrentBitVector.hpp b/utility/ConcurrentBitVector.hpp
new file mode 100644
index 0000000..8a0f5ba
--- /dev/null
+++ b/utility/ConcurrentBitVector.hpp
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_
+#define QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <limits>
+
+#include "utility/BitManipulation.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+class ConcurrentBitVector {
+ public:
+  ConcurrentBitVector(void *memory_location, const std::size_t num_bits)
+      : owned_(false),
+        num_bits_(num_bits),
+        data_array_(static_cast<DataType *>(memory_location)),
+        data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & kLowerOrderMask ? 1 : 0)) {
+    DCHECK_GT(num_bits, 0);
+    DCHECK(data_array_ != nullptr);
+  }
+
+  explicit ConcurrentBitVector(const std::size_t num_bits)
+      : owned_(true),
+        num_bits_(num_bits),
+        data_array_(static_cast<DataType *>(std::malloc(BytesNeeded(num_bits)))),
+        data_array_size_((num_bits >> kHigherOrderShift) + (num_bits & kLowerOrderMask ? 1 : 0)) {
+    DCHECK_GT(num_bits, 0);
+    clear();
+  }
+
+  ~ConcurrentBitVector() {
+    if (owned_ && (num_bits_ != 0)) {
+      std::free(data_array_);
+    }
+  }
+
+  inline static std::size_t BytesNeeded(const std::size_t num_bits) {
+    if (num_bits & kLowerOrderMask) {
+      return ((num_bits >> kHigherOrderShift) + 1) * kDataSize;
+    } else {
+      return (num_bits >> kHigherOrderShift) * kDataSize;
+    }
+  }
+
+  inline std::size_t size() const {
+    return num_bits_;
+  }
+
+  inline void clear() {
+    std::memset(data_array_, 0, BytesNeeded(num_bits_));
+  }
+
+  inline bool getBit(const std::size_t bit_num) const {
+    const std::size_t data_value =
+        data_array_[bit_num >> kHigherOrderShift].load(std::memory_order_relaxed);
+    return (data_value << (bit_num & kLowerOrderMask)) & kTopBit;
+  }
+
+  inline void setBit(const std::size_t bit_num) const {
+    data_array_[bit_num >> kHigherOrderShift].fetch_or(
+        kTopBit >> (bit_num & kLowerOrderMask), std::memory_order_relaxed);
+  }
+
+  inline std::size_t firstOne(std::size_t position = 0) const {
+    DCHECK_LT(position, num_bits_);
+
+    const std::size_t position_index = position >> kHigherOrderShift;
+    const std::size_t data_value =
+        data_array_[position_index].load(std::memory_order_relaxed)
+            & (std::numeric_limits<std::size_t>::max() >> (position & kLowerOrderMask));
+    if (data_value) {
+      return (position & ~kLowerOrderMask) | leading_zero_count<std::size_t>(data_value);
+    }
+
+    for (std::size_t array_idx = position_index + 1;
+         array_idx < data_array_size_;
+         ++array_idx) {
+      const std::size_t data_value =
+          data_array_[array_idx].load(std::memory_order_relaxed);
+      if (data_value) {
+        return (array_idx << kHigherOrderShift) | leading_zero_count<std::size_t>(data_value);
+      }
+    }
+
+    return num_bits_;
+  }
+
+  inline std::size_t nextOne(const std::size_t position) const {
+    const std::size_t search_pos = position + 1;
+    return search_pos >= num_bits_ ? num_bits_ : firstOne(search_pos);
+  }
+
+  inline std::size_t onesCount() const {
+    std::size_t count = 0;
+    for (std::size_t array_idx = 0;
+         array_idx < data_array_size_;
+         ++array_idx) {
+      count += population_count<std::size_t>(
+          data_array_[array_idx].load(std::memory_order_relaxed));
+    }
+    return count;
+  }
+
+  inline std::size_t onesCount(const std::size_t start_position,
+                               const std::size_t end_position) const {
+    DCHECK_LE(start_position, end_position);
+    DCHECK_LT(start_position, num_bits_);
+    DCHECK_LE(end_position, num_bits_);
+
+    const std::size_t start_index = start_position >> kHigherOrderShift;
+    const std::size_t end_index = end_position >> kHigherOrderShift;
+    if (start_index == end_index) {
+      const std::size_t data_value =
+          data_array_[start_index].load(std::memory_order_relaxed)
+              & (std::numeric_limits<std::size_t>::max() >> (start_position & kLowerOrderMask))
+              &  ~(std::numeric_limits<std::size_t>::max() >> (end_position & kLowerOrderMask));
+      return population_count<std::size_t>(data_value);
+    } else {
+      const std::size_t first_data =
+          data_array_[start_index].load(std::memory_order_relaxed)
+              & (std::numeric_limits<std::size_t>::max() >> (start_position & kLowerOrderMask));
+      std::size_t count = population_count<std::size_t>(first_data);
+
+      for (std::size_t array_idx = start_index + 1;
+           array_idx < end_index;
+           ++array_idx) {
+        count += population_count<std::size_t>(
+            data_array_[array_idx].load(std::memory_order_relaxed));
+      }
+
+      const std::size_t last_offset = end_position & kLowerOrderMask;
+      if (last_offset != 0) {
+        const std::size_t last_data =
+            data_array_[end_index].load(std::memory_order_relaxed)
+                &  ~(std::numeric_limits<std::size_t>::max() >> last_offset);
+        count += population_count<std::size_t>(last_data);
+      }
+
+      return count;
+    }
+  }
+
+ private:
+  typedef std::atomic<std::size_t> DataType;
+  static constexpr std::size_t kDataSize = sizeof(DataType);
+
+  // This works as long as the bit-width of size_t is power of 2:
+  static constexpr std::size_t kLowerOrderMask = (sizeof(std::size_t) << 3) - 1;
+  // This works for 32-bit or 64-bit size_t:
+  static constexpr std::size_t kHigherOrderShift = sizeof(std::size_t) == 4 ? 5 : 6;
+
+  static constexpr std::size_t kOne = static_cast<std::size_t>(1);
+  static constexpr std::size_t kTopBit = kOne << kLowerOrderMask;
+
+  const bool owned_;
+  const std::size_t num_bits_;
+  DataType *data_array_;
+  const std::size_t data_array_size_;
+
+  DISALLOW_COPY_AND_ASSIGN(ConcurrentBitVector);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_CONCURRENT_BIT_VECTOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/EventProfiler.cpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.cpp b/utility/EventProfiler.cpp
new file mode 100644
index 0000000..482c1e0
--- /dev/null
+++ b/utility/EventProfiler.cpp
@@ -0,0 +1,28 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "utility/EventProfiler.hpp"
+
+#include <cstddef>
+#include <string>
+#include <vector>
+
+namespace quickstep {
+
+EventProfiler<std::string, std::size_t, std::size_t> simple_profiler;
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/EventProfiler.hpp
----------------------------------------------------------------------
diff --git a/utility/EventProfiler.hpp b/utility/EventProfiler.hpp
new file mode 100644
index 0000000..fb4eb35
--- /dev/null
+++ b/utility/EventProfiler.hpp
@@ -0,0 +1,187 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_
+#define QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_
+
+#include <chrono>
+#include <cstddef>
+#include <cstring>
+#include <ctime>
+#include <iomanip>
+#include <map>
+#include <ostream>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "threading/Mutex.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+using clock = std::chrono::steady_clock;
+
+template <typename TagT, typename ...PayloadT>
+class EventProfiler {
+
+ public:
+  EventProfiler()
+      : zero_time_(clock::now()) {
+  }
+
+  struct EventInfo {
+    clock::time_point start_time;
+    clock::time_point end_time;
+    bool is_finished;
+    std::tuple<PayloadT...> payload;
+
+    explicit EventInfo(const clock::time_point &start_time_in)
+        : start_time(start_time_in),
+          is_finished(false) {
+    }
+
+    EventInfo()
+        : start_time(clock::now()),
+          is_finished(false) {
+    }
+
+    inline void setPayload(PayloadT &&...in_payload) {
+      payload = std::make_tuple(std::forward<PayloadT>(in_payload)...);
+    }
+
+    inline void endEvent() {
+      end_time = clock::now();
+      is_finished = true;
+    }
+  };
+
+  struct EventContainer {
+    EventContainer()
+        : context(0) {}
+
+    inline void startEvent(const TagT &tag) {
+      events[tag].emplace_back(clock::now());
+    }
+
+    inline void endEvent(const TagT &tag) {
+      auto &event_info = events.at(tag).back();
+      event_info.is_finished = true;
+      event_info.end_time = clock::now();
+    }
+
+    inline std::vector<EventInfo> *getEventLine(const TagT &tag) {
+      return &events[tag];
+    }
+
+    inline void setContext(int context_in) {
+      context = context_in;
+    }
+
+    inline int getContext() const {
+      return context;
+    }
+
+    std::map<TagT, std::vector<EventInfo>> events;
+    int context;
+  };
+
+  EventContainer *getContainer() {
+    MutexLock lock(mutex_);
+    return &thread_map_[std::this_thread::get_id()];
+  }
+
+  void writeToStream(std::ostream &os) const {
+    time_t rawtime;
+    time(&rawtime);
+    char event_id[32];
+    strftime(event_id, sizeof event_id, "%Y-%m-%d %H:%M:%S", localtime(&rawtime));
+
+    int thread_id = 0;
+    for (const auto &thread_ctx : thread_map_) {
+      for (const auto &event_group : thread_ctx.second.events) {
+        for (const auto &event_info : event_group.second) {
+          CHECK(event_info.is_finished) << "Unfinished profiling event";
+
+          os << std::setprecision(12)
+             << event_id << ","
+             << thread_id << "," << event_group.first << ",";
+
+          PrintTuple(os, event_info.payload, ",");
+
+          os << std::chrono::duration<double>(event_info.start_time - zero_time_).count()
+             << ","
+             << std::chrono::duration<double>(event_info.end_time - zero_time_).count()
+             << "\n";
+        }
+      }
+      ++thread_id;
+    }
+  }
+
+  void clear() {
+    zero_time_ = clock::now();
+    thread_map_.clear();
+  }
+
+  const std::map<std::thread::id, EventContainer> &containers() {
+    return thread_map_;
+  }
+
+  const clock::time_point &zero_time() {
+    return zero_time_;
+  }
+
+ private:
+  template<class Tuple, std::size_t N>
+  struct TuplePrinter {
+    static void Print(std::ostream &os, const Tuple &t, const std::string &sep) {
+      TuplePrinter<Tuple, N-1>::Print(os, t, sep);
+      os << std::get<N-1>(t) << sep;
+    }
+  };
+
+  template<class Tuple>
+  struct TuplePrinter<Tuple, 1> {
+    static void Print(std::ostream &os, const Tuple &t, const std::string &sep) {
+      os << std::get<0>(t) << sep;
+    }
+  };
+
+  template<class... Args>
+  static void PrintTuple(std::ostream &os, const std::tuple<Args...>& t, const std::string &sep) {
+    TuplePrinter<decltype(t), sizeof...(Args)>::Print(os, t, sep);
+  }
+
+  clock::time_point zero_time_;
+  std::map<std::thread::id, EventContainer> thread_map_;
+  Mutex mutex_;
+};
+
+extern EventProfiler<std::string, std::size_t, std::size_t> simple_profiler;
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_EVENT_PROFILER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/ExecutionDAGVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/ExecutionDAGVisualizer.cpp b/utility/ExecutionDAGVisualizer.cpp
index 2938808..488cb2f 100644
--- a/utility/ExecutionDAGVisualizer.cpp
+++ b/utility/ExecutionDAGVisualizer.cpp
@@ -147,6 +147,9 @@ void ExecutionDAGVisualizer::bindProfilingStats(
   for (std::size_t node_index = 0; node_index < num_nodes_; ++node_index) {
     if (nodes_.find(node_index) != nodes_.end()) {
       const std::size_t relop_start_time = time_start[node_index];
+      if (relop_start_time == std::numeric_limits<std::size_t>::max()) {
+        continue;
+      }
       const std::size_t relop_end_time = time_end[node_index];
       const std::size_t relop_elapsed_time = time_elapsed[node_index];
       NodeInfo &node_info = nodes_[node_index];

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3b0f4e05/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
index 15c8f0b..d8443fe 100644
--- a/utility/lip_filter/BitVectorExactFilter.hpp
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -83,7 +83,7 @@ class BitVectorExactFilter : public LIPFilter {
     DCHECK(batch != nullptr);
     DCHECK_LE(batch_size, batch->size());
 
-    return InvokeOnAnyValueAccessor(
+    return InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
         accessor,
         [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
       if (is_attr_nullable) {



Mime
View raw message