quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject incubator-quickstep git commit: Support for performing partitioned aggregation.
Date Tue, 25 Oct 2016 03:16:58 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/partagg-merged-experiment [created] e6738e823


Support for performing partitioned aggregation.

- Used for creating a pool of hash tables such that each hash table
  belongs to a unique partition.
- The partitioning is done on the group-by keys.
- Wrote a utility function to compute composite hash of a group of
  TypedValues.
- Added a check for whether the aggregation is partitioned or not.
- The conditions for whether the aggregation can be partitioned
  are as follows:
  1. The query has a GROUP BY clause.
  2. There are no aggrgeations with a DISTINCT clause.
  3. The estimated number of groups are greater than a pre-defined
  threshold.
  4. The query has at least one aggregation function.
- Method for partitioned aggregation with GROUP BY
- StorageBlock now provides a method for performing GROUP BY aggregation
  in a partitioned way.
- The Tuple class now supports a method to compute the hash of the entire
  tuple (i.e. hash key is the composite key made up of all the
  attributes in the tuple).
- AggregationOperationState calls appropriate method (i.e.
  aggregateGroupBy or aggregateGroupByPartitioned) based on the way in
  which aggregation is being performed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e6738e82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e6738e82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e6738e82

Branch: refs/heads/partagg-merged-experiment
Commit: e6738e82371abce403313bd6c9743c4355b8ee3e
Parents: 393eba5
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Wed Sep 21 11:43:39 2016 -0500
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Mon Oct 24 21:50:03 2016 -0500

----------------------------------------------------------------------
 .../FinalizeAggregationOperator.cpp             |  35 ++-
 .../FinalizeAggregationOperator.hpp             |   9 +-
 storage/AggregationOperationState.cpp           | 187 +++++++++++++---
 storage/AggregationOperationState.hpp           |  69 +++++-
 storage/CMakeLists.txt                          |  11 +
 storage/HashTablePool.hpp                       |   2 +-
 storage/PartitionedHashTablePool.hpp            | 224 +++++++++++++++++++
 storage/StorageBlock.cpp                        |  53 +++++
 storage/StorageBlock.hpp                        |  48 ++++
 types/containers/CMakeLists.txt                 |   1 +
 types/containers/Tuple.hpp                      |   8 +
 utility/CMakeLists.txt                          |   6 +
 utility/CompositeHash.hpp                       |  52 +++++
 13 files changed, 665 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 7e337de..55d1357 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
 
   if (blocking_dependencies_met_ && !started_) {
     started_ = true;
-    container->addNormalWorkOrder(
-        new FinalizeAggregationWorkOrder(
-            query_id_,
-            query_context->getAggregationState(aggr_state_index_),
-            query_context->getInsertDestination(output_destination_index_)),
-        op_index_);
+    DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr);
+    if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned())
{
+      // The same AggregationState is shared across all the WorkOrders.
+      for (std::size_t part_id = 0;
+           part_id < query_context->getAggregationState(aggr_state_index_)
+                         ->getNumPartitions();
+           ++part_id) {
+        container->addNormalWorkOrder(
+            new FinalizeAggregationWorkOrder(
+                query_id_,
+                query_context->getAggregationState(aggr_state_index_),
+                query_context->getInsertDestination(output_destination_index_),
+                static_cast<int>(part_id)),
+            op_index_);
+      }
+    } else {
+      container->addNormalWorkOrder(
+          new FinalizeAggregationWorkOrder(
+              query_id_,
+              query_context->getAggregationState(aggr_state_index_),
+              query_context->getInsertDestination(output_destination_index_)),
+          op_index_);
+    }
   }
   return started_;
 }
@@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
 
 
 void FinalizeAggregationWorkOrder::execute() {
-  state_->finalizeAggregate(output_destination_);
+  if (state_->isAggregatePartitioned()) {
+    state_->finalizeAggregatePartitioned(part_id_, output_destination_);
+  } else {
+    state_->finalizeAggregate(output_destination_);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 0aeac2a..ae7127a 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -119,13 +119,17 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
    * @param state The AggregationState to use.
    * @param output_destination The InsertDestination to insert aggregation
    *        results.
+   * @param part_id The partition ID for which the Finalize aggregation work
+   *        order is issued. Ignore if aggregation is not partitioned.
    */
   FinalizeAggregationWorkOrder(const std::size_t query_id,
                                AggregationOperationState *state,
-                               InsertDestination *output_destination)
+                               InsertDestination *output_destination,
+                               const int part_id = -1)
       : WorkOrder(query_id),
         state_(DCHECK_NOTNULL(state)),
-        output_destination_(DCHECK_NOTNULL(output_destination)) {}
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        part_id_(part_id) {}
 
   ~FinalizeAggregationWorkOrder() override {}
 
@@ -134,6 +138,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
  private:
   AggregationOperationState *state_;
   InsertDestination *output_destination_;
+  const int part_id_;
 
   DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index eb7ca79..0fbe4cb 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -72,6 +72,8 @@ AggregationOperationState::AggregationOperationState(
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
+      is_aggregate_partitioned_(checkAggregatePartitioned(
+          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
       predicate_(predicate),
       group_by_list_(std::move(group_by)),
       arguments_(std::move(arguments)),
@@ -175,11 +177,10 @@ AggregationOperationState::AggregationOperationState(
         key_types.insert(
             key_types.end(), argument_types.begin(), argument_types.end());
         // TODO(jianqiao): estimated_num_entries is quite inaccurate for
-        // estimating
-        // the number of entries in the distinctify hash table. We may estimate
-        // for each distinct aggregation an estimated_num_distinct_keys value
-        // during
-        // query optimization, if it worths.
+        // estimating the number of entries in the distinctify hash table.
+        // We may estimate for each distinct aggregation an
+        // estimated_num_distinct_keys value during query optimization, if it's
+        // worth.
         distinctify_hashtables_.emplace_back(
             AggregationStateFastHashTableFactory::CreateResizable(
                 *distinctify_hash_table_impl_types_it,
@@ -195,14 +196,24 @@ AggregationOperationState::AggregationOperationState(
     }
 
     if (!group_by_handles.empty()) {
-      // Aggregation with GROUP BY: create a HashTable pool for per-group
-      // states.
-      group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
-                                                       hash_table_impl_type,
-                                                       group_by_types,
-                                                       payload_sizes,
-                                                       group_by_handles,
-                                                       storage_manager));
+      // Aggregation with GROUP BY: create a HashTable pool.
+      if (!is_aggregate_partitioned_) {
+        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
+                                                         hash_table_impl_type,
+                                                         group_by_types,
+                                                         payload_sizes,
+                                                         group_by_handles,
+                                                         storage_manager));
+      } else {
+        partitioned_group_by_hashtable_pool_.reset(
+            new PartitionedHashTablePool(estimated_num_entries,
+                                         kNumPartitionsForAggregate,
+                                         hash_table_impl_type,
+                                         group_by_types,
+                                         payload_sizes,
+                                         group_by_handles,
+                                         storage_manager));
+      }
     }
   }
 }
@@ -450,19 +461,71 @@ void AggregationOperationState::aggregateBlockHashTable(
     }
   }
 
-  // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
-  // directly into the (threadsafe) shared global HashTable for this
-  // aggregate.
-  DCHECK(group_by_hashtable_pool_ != nullptr);
-  AggregationStateHashTableBase *agg_hash_table =
+  if (!is_aggregate_partitioned_) {
+    // Call StorageBlock::aggregateGroupBy() to aggregate this block's values
+    // directly into the (threadsafe) shared global HashTable for this
+    // aggregate.
+    DCHECK(group_by_hashtable_pool_ != nullptr);
+    AggregationStateHashTableBase *agg_hash_table =
       group_by_hashtable_pool_->getHashTableFast();
-  DCHECK(agg_hash_table != nullptr);
-  block->aggregateGroupBy(arguments_,
-                          group_by_list_,
-                          matches.get(),
-                          agg_hash_table,
-                          &reuse_group_by_vectors);
-  group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+    DCHECK(agg_hash_table != nullptr);
+    block->aggregateGroupBy(arguments_,
+                            group_by_list_,
+                            matches.get(),
+                            agg_hash_table,
+                            &reuse_group_by_vectors);
+    group_by_hashtable_pool_->returnHashTable(agg_hash_table);
+  } else {
+    ColumnVectorsValueAccessor temp_result;
+    // IDs of 'arguments' as attributes in the ValueAccessor we create below.
+    std::vector<attribute_id> argument_ids;
+
+    // IDs of GROUP BY key element(s) in the ValueAccessor we create below.
+    std::vector<attribute_id> key_ids;
+    const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
+    block->aggregateGroupByPartitioned(
+        arguments_,
+        group_by_list_,
+        matches.get(),
+        num_partitions,
+        &temp_result,
+        &argument_ids,
+        &key_ids,
+        &reuse_group_by_vectors);
+    // Compute the partitions for the tuple formed by group by values.
+    std::vector<std::unique_ptr<TupleIdSequence>> partition_membership;
+    partition_membership.resize(num_partitions);
+
+    // Create a tuple-id sequence for each partition.
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      partition_membership[partition].reset(
+          new TupleIdSequence(temp_result.getEndPosition()));
+    }
+
+    // Iterate over ValueAccessor for each tuple,
+    // set a bit in the appropriate TupleIdSequence.
+    temp_result.beginIteration();
+    while (temp_result.next()) {
+      // We need a unique_ptr because getTupleWithAttributes() uses "new".
+      std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids));
+      const std::size_t curr_tuple_partition_id =
+          curr_tuple->getTupleHash() % num_partitions;
+      partition_membership[curr_tuple_partition_id]->set(
+          temp_result.getCurrentPosition(), true);
+    }
+    // For each partition, create an adapter around Value Accessor and
+    // TupleIdSequence.
+    std::vector<std::unique_ptr<
+        TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter;
+    adapter.resize(num_partitions);
+    for (std::size_t partition = 0; partition < num_partitions; ++partition) {
+      adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter(
+          *(partition_membership)[partition]));
+      partitioned_group_by_hashtable_pool_->getHashTable(partition)
+          ->upsertValueAccessorCompositeKeyFast(
+              argument_ids, adapter[partition].get(), key_ids, true);
+    }
+  }
 }
 
 void AggregationOperationState::finalizeSingleState(
@@ -606,13 +669,81 @@ void AggregationOperationState::finalizeHashTable(
 }
 
 void AggregationOperationState::destroyAggregationHashTablePayload() {
-  if (group_by_hashtable_pool_ != nullptr) {
-    auto all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
-    DCHECK(all_hash_tables != nullptr);
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables
=
+      nullptr;
+  if (!is_aggregate_partitioned_) {
+    if (group_by_hashtable_pool_ != nullptr) {
+      all_hash_tables = group_by_hashtable_pool_->getAllHashTables();
+    }
+  } else {
+    if (partitioned_group_by_hashtable_pool_ != nullptr) {
+      all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables();
+    }
+  }
+  if (all_hash_tables != nullptr) {
     for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index)
{
       (*all_hash_tables)[ht_index]->destroyPayload();
     }
   }
 }
 
+void AggregationOperationState::finalizeAggregatePartitioned(
+    const std::size_t partition_id, InsertDestination *output_destination) {
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
+  // Collect per-aggregate finalized values.
+  std::vector<std::unique_ptr<ColumnVector>> final_values;
+  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
+    AggregationStateHashTableBase *hash_table =
+        partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
+    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+        *hash_table, &group_by_keys, agg_idx);
+    if (agg_result_col != nullptr) {
+      final_values.emplace_back(agg_result_col);
+    }
+  }
+
+  // Reorganize 'group_by_keys' in column-major order so that we can make a
+  // ColumnVectorsValueAccessor to bulk-insert results.
+  //
+  // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal
+  // if there is only one aggregate. The need to do this should hopefully go
+  // away when we work out storing composite structures for multiple aggregates
+  // in a single HashTable.
+  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
+  std::size_t group_by_element_idx = 0;
+  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_)
{
+    const Type &group_by_type = group_by_element->getType();
+    if (NativeColumnVector::UsableForType(group_by_type)) {
+      NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    } else {
+      IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+      group_by_cvs.emplace_back(element_cv);
+      for (std::vector<TypedValue> &group_key : group_by_keys) {
+        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+      }
+    }
+    ++group_by_element_idx;
+  }
+
+  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
+  // and the finalized aggregates.
+  ColumnVectorsValueAccessor complete_result;
+  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
+    complete_result.addColumn(group_by_cv.release());
+  }
+  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
+    complete_result.addColumn(final_value_cv.release());
+  }
+
+  // Bulk-insert the complete result.
+  output_destination->bulkInsertTuples(&complete_result);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index c251983..ca9ae65 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -32,6 +32,7 @@
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTablePool.hpp"
+#include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
@@ -176,12 +177,44 @@ class AggregationOperationState {
    **/
   void destroyAggregationHashTablePayload();
 
+  /**
+   * @brief Generate the final results for the aggregates managed by this
+   *        AggregationOperationState and write them out to StorageBlock(s).
+   *        In this implementation, each thread picks a hash table belonging to
+   *        a partition and writes its values to StorageBlock(s). There is no
+   *        need to merge multiple hash tables in one, because there is no
+   *        overlap in the keys across two hash tables.
+   *
+   * @param partition_id The ID of the partition for which finalize is being
+   *        performed.
+   * @param output_destination An InsertDestination where the finalized output
+   *        tuple(s) from this aggregate are to be written.
+   **/
+  void finalizeAggregatePartitioned(
+      const std::size_t partition_id, InsertDestination *output_destination);
+
   static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
                                      AggregationStateHashTableBase *dst);
 
+  bool isAggregatePartitioned() const {
+    return is_aggregate_partitioned_;
+  }
+
+  /**
+   * @note This method is relevant only when the aggregate is partitioned.
+   **/
+  std::size_t getNumPartitions() const {
+    return is_aggregate_partitioned_
+               ? partitioned_group_by_hashtable_pool_->getNumPartitions()
+               : 1;
+  }
+
   int dflag;
 
  private:
+  static constexpr std::size_t kPartitionedAggregateThreshold = 500000;
+  static constexpr std::size_t kNumPartitionsForAggregate = 40;
+
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
   void mergeSingleState(
@@ -195,12 +228,39 @@ class AggregationOperationState {
   void finalizeSingleState(InsertDestination *output_destination);
   void finalizeHashTable(InsertDestination *output_destination);
 
-  // A vector of group by hash table pools.
-  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+  bool checkAggregatePartitioned(
+      const std::size_t estimated_num_groups,
+      const std::vector<bool> &is_distinct,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const std::vector<const AggregateFunction *> &aggregate_functions) const
{
+    // If there's no aggregation, return false.
+    if (aggregate_functions.empty()) {
+      return false;
+    }
+    // Check if there's a distinct operation involved in any aggregate, if so
+    // the aggregate can't be partitioned.
+    for (auto distinct : is_distinct) {
+      if (distinct) {
+        return false;
+      }
+    }
+    // There's no distinct aggregation involved, Check if there's at least one
+    // GROUP BY operation.
+    if (group_by.empty()) {
+      return false;
+    }
+    // There are GROUP BYs without DISTINCT. Check if the estimated number of
+    // groups is large enough to warrant a partitioned aggregation.
+    return estimated_num_groups > kPartitionedAggregateThreshold;
+  }
 
   // Common state for all aggregates in this operation: the input relation, the
   // filter predicate (if any), and the list of GROUP BY expressions (if any).
   const CatalogRelationSchema &input_relation_;
+
+  // Whether the aggregation is partitioned or not.
+  const bool is_aggregate_partitioned_;
+
   std::unique_ptr<const Predicate> predicate_;
   std::vector<std::unique_ptr<const Scalar>> group_by_list_;
 
@@ -233,6 +293,11 @@ class AggregationOperationState {
   std::vector<std::unique_ptr<AggregationStateHashTableBase>>
       group_by_hashtables_;
 
+  // A vector of group by hash table pools.
+  std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
+
+  std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
+
   StorageManager *storage_manager_;
 
   DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 0e32cc1..9348dda 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock
 add_library(quickstep_storage_PackedRowStoreValueAccessor
             ../empty_src.cpp
             PackedRowStoreValueAccessor.hpp)
+add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp)
 add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp)
 add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp)
 add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp)
@@ -289,6 +290,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
                       quickstep_storage_InsertDestination
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
@@ -852,6 +854,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor
                       quickstep_types_TypedValue
                       quickstep_utility_BitVector
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_PartitionedHashTablePool
+                      glog
+                      quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_storage_FastHashTable
+                      quickstep_storage_FastHashTableFactory
+                      quickstep_storage_HashTableBase
+                      quickstep_utility_Macros
+                      quickstep_utility_StringUtil)                    
 target_link_libraries(quickstep_storage_PreloaderThread
                       glog
                       quickstep_catalog_CatalogDatabase
@@ -1169,6 +1179,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_LinearOpenAddressingHashTable
                       quickstep_storage_PackedRowStoreTupleStorageSubBlock
                       quickstep_storage_PackedRowStoreValueAccessor
+                      quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_PreloaderThread
                       quickstep_storage_SMAIndexSubBlock
                       quickstep_storage_SeparateChainingHashTable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 3cdfcb3..96cf849 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -173,7 +173,7 @@ class HashTablePool {
    * @param All the hash tables in the pool.
    *
    **/
-  const std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
       getAllHashTables() {
     return &hash_tables_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/PartitionedHashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp
new file mode 100644
index 0000000..cb220ee
--- /dev/null
+++ b/storage/PartitionedHashTablePool.hpp
@@ -0,0 +1,224 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_
+
+#include <algorithm>
+#include <chrono>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/FastHashTable.hpp"
+#include "storage/FastHashTableFactory.hpp"
+#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+class Type;
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+/**
+ * @brief A pool of HashTables used for a single aggregation handle. Each
+ *        HashTable represents values from a given partition, which is
+ *        determined by the keys in the group by clause.
+ **/
+class PartitionedHashTablePool {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param agg_handle The aggregation handle.
+   * @param storage_manager A pointer to the storage manager.
+   *
+   * @note The estimate of number of entries is quite inaccurate at this time.
+   *       If we go by the current estimate, each hash table demands much
+   *       larger space than it actually needs, which causes the system to
+   *       either trigger evictions or worse - run out of memory. To fix this
+   *       issue, we divide the estimate by 100. The division will not affect
+   *       correctness, however it may allocate some hash tables smaller space
+   *       than their requirement, causing them to be resized during build
+   *       phase, which has a performance penalty.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           AggregationHandle *agg_handle,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        agg_handle_(DCHECK_NOTNULL(agg_handle)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Constructor.
+   *
+   * @note This constructor is relevant for the HashTable specialized for
+   *       aggregation.
+   *
+   * @param estimated_num_entries The maximum number of entries in a hash table.
+   * @param num_partitions The number of partitions (i.e. number of HashTables)
+   * @param hash_table_impl_type The type of hash table implementation.
+   * @param group_by_types A vector of pointer of types which form the group by
+   *        key.
+   * @param payload_sizes The sizes of the payload elements (i.e.
+   *        AggregationStates).
+   * @param handles The aggregation handles.
+   * @param storage_manager A pointer to the storage manager.
+   **/
+  PartitionedHashTablePool(const std::size_t estimated_num_entries,
+                           const std::size_t num_partitions,
+                           const HashTableImplType hash_table_impl_type,
+                           const std::vector<const Type *> &group_by_types,
+                           const std::vector<std::size_t> &payload_sizes,
+                           const std::vector<AggregationHandle *> &handles,
+                           StorageManager *storage_manager)
+      : estimated_num_entries_(
+            setHashTableSize(estimated_num_entries, num_partitions)),
+        num_partitions_(num_partitions),
+        hash_table_impl_type_(hash_table_impl_type),
+        group_by_types_(group_by_types),
+        payload_sizes_(payload_sizes),
+        handles_(handles),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    initializeAllHashTables();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Check out a hash table for insertion.
+   *
+   * @param partition_id The ID of the partitioned HashTable.
+   *
+   * @return A hash table pointer for the given HashTable.
+   **/
+  AggregationStateHashTableBase* getHashTableFast(const std::size_t partition_id) {
+    DCHECK_LT(partition_id, num_partitions_);
+    DCHECK_LT(partition_id, hash_tables_.size());
+    return hash_tables_[partition_id].get();
+  }
+
+  /**
+   * @brief Get all the hash tables from the pool.
+   *
+   * @warning The caller should ensure that this call is made when no hash table
+   *          is being checked in or checked out from the pool. In other words
+   *          the hash table pool is in read-only state.
+   *
+   * @param All the hash tables in the pool.
+   *
+   **/
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>>*
+      getAllHashTables() {
+    return &hash_tables_;
+  }
+
+  /**
+   * @brief Get the number of partitions used for the aggregation.
+   **/
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+ private:
+  void initializeAllHashTables() {
+    for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) {
+      AggregationStateHashTableBase *part_hash_table = createNewHashTableFast();
+      hash_tables_.push_back(
+          std::unique_ptr<AggregationStateHashTableBase>(part_hash_table));
+    }
+  }
+
+  AggregationStateHashTableBase* createNewHashTable() {
+    return agg_handle_->createGroupByHashTable(hash_table_impl_type_,
+                                               group_by_types_,
+                                               estimated_num_entries_,
+                                               storage_manager_);
+  }
+
+  AggregationStateHashTableBase* createNewHashTableFast() {
+    return AggregationStateFastHashTableFactory::CreateResizable(
+                hash_table_impl_type_,
+                group_by_types_,
+                estimated_num_entries_,
+                payload_sizes_,
+                handles_,
+                storage_manager_);
+  }
+
+  inline std::size_t setHashTableSize(const std::size_t overall_estimate,
+                                      const std::size_t num_partitions) const {
+    CHECK_NE(num_partitions, 0Lu);
+    return std::max(static_cast<std::size_t>(overall_estimate/num_partitions), 26000Lu);
+  }
+
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_;
+
+  const std::size_t estimated_num_entries_;
+  const std::size_t num_partitions_;
+
+  const HashTableImplType hash_table_impl_type_;
+
+  const std::vector<const Type *> group_by_types_;
+
+  std::vector<std::size_t> payload_sizes_;
+
+  AggregationHandle *agg_handle_;
+  const std::vector<AggregationHandle *> handles_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/StorageBlock.cpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index dd3e19d..fdeecbd 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -1329,4 +1329,57 @@ const std::size_t StorageBlock::getNumTuples() const {
   return tuple_store_->numTuples();
 }
 
+void StorageBlock::aggregateGroupByPartitioned(
+    const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+    const std::vector<std::unique_ptr<const Scalar>> &group_by,
+    const TupleIdSequence *filter,
+    const std::size_t num_partitions,
+    ColumnVectorsValueAccessor *temp_result,
+    std::vector<attribute_id> *argument_ids,
+    std::vector<attribute_id> *key_ids,
+    std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const
{
+  DCHECK(!group_by.empty())
+      << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions";
+
+  SubBlocksReference sub_blocks_ref(*tuple_store_,
+                                    indices_,
+                                    indices_consistent_);
+
+  std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
+  attribute_id attr_id = 0;
+
+  // First, put GROUP BY keys into 'temp_result'.
+  if (reuse_group_by_vectors->empty()) {
+    // Compute GROUP BY values from group_by Scalars, and store them in
+    // reuse_group_by_vectors for reuse by other aggregates on this same
+    // block.
+    reuse_group_by_vectors->reserve(group_by.size());
+    for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
+      reuse_group_by_vectors->emplace_back(
+          group_by_element->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result->addColumn(reuse_group_by_vectors->back().get(), false);
+      key_ids->push_back(attr_id++);
+    }
+  } else {
+    // Reuse precomputed GROUP BY values from reuse_group_by_vectors.
+    DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size())
+        << "Wrong number of reuse_group_by_vectors";
+    for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors)
{
+      temp_result->addColumn(reuse_cv.get(), false);
+      key_ids->push_back(attr_id++);
+    }
+  }
+
+  // Compute argument vectors and add them to 'temp_result'.
+  for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments)
{
+    for (const std::unique_ptr<const Scalar> &args : argument) {
+      temp_result->addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref));
+      argument_ids->push_back(attr_id++);
+    }
+    if (argument.empty()) {
+      argument_ids->push_back(kInvalidAttributeID);
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/storage/StorageBlock.hpp
----------------------------------------------------------------------
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index 488efcc..10d13b6 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -43,6 +43,7 @@ class AggregationHandle;
 class AggregationState;
 class CatalogRelationSchema;
 class ColumnVector;
+class ColumnVectorsValueAccessor;
 class InsertDestinationInterface;
 class Predicate;
 class Scalar;
@@ -451,6 +452,53 @@ class StorageBlock : public StorageBlockBase {
       AggregationStateHashTableBase *hash_table,
       std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
 
+
+  /**
+   * @brief Perform the GROUP BY aggregation for the case when aggregation is
+   *        partitioned.
+   *
+   * @note The difference between this method and the aggregateGroupBy method
+   *       is that in this method, the tuples are routed to different HashTables
+   *       based on the partition to which they belong to. The partition is
+   *       determined by the GROUP BY attributes. Right now hash based
+   *       partitioning is performed.
+   *
+   * @note This function only creates the ColumnVectorsValueAccessor needed for
+   *       the insertion in the hash table. The actual insertion in respective
+   *       hash tables should be handled by the caller. See
+   *       AggregationOperationState::aggregateHashTable() for one such
+   *       implementation.
+   *
+   * @param arguments The arguments to the aggregation function as Scalars.
+   * @param group_by The list of GROUP BY attributes/expressions. The tuples in
+   *        this storage block are grouped by these attributes before
+   *        aggregation.
+   * @param filter If non-NULL, then only tuple IDs which are set in the
+   *        filter will be checked (all others will be assumed to be false).
+   * @param num_partitions The number of partitions used for the aggregation.
+   * @param temp_result The ColumnVectorsValueAccessor used for collecting
+   *        the attribute values from this StorageBlock.
+   * @param arguments_ids The attribute IDs used for the aggregation, which
+   *        come from the arguments vector. If arguments is empty, this vector
+   *        is filled with invalid attribute IDs.
+   * @param key_ids The attribute IDs of the group by attributes.
+   * @param reuse_group_by_vectors This parameter is used to store and reuse
+   *        GROUP BY attribute vectors pre-computed in an earlier invocation of
+   *        aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr
+   *        for ease of use. Current invocation of aggregateGroupBy() will reuse
+   *        ColumnVectors if non-empty, otherwise computes ColumnVectors based
+   *        on \c group_by and stores them in \c reuse_group_by_vectors.
+   **/
+  void aggregateGroupByPartitioned(
+      const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments,
+      const std::vector<std::unique_ptr<const Scalar>> &group_by,
+      const TupleIdSequence *filter,
+      const std::size_t num_partitions,
+      ColumnVectorsValueAccessor *temp_result,
+      std::vector<attribute_id> *argument_ids,
+      std::vector<attribute_id> *key_ids,
+      std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const;
+
   /**
    * @brief Inserts the GROUP BY expressions and aggregation arguments together
    *        as keys into the distinctify hash table.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/types/containers/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index aacb63a..c2a6623 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple
                       quickstep_catalog_CatalogTypedefs
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_CompositeHash
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple_proto
                       quickstep_types_TypedValue_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/types/containers/Tuple.hpp
----------------------------------------------------------------------
diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp
index 60f832c..6237d54 100644
--- a/types/containers/Tuple.hpp
+++ b/types/containers/Tuple.hpp
@@ -28,6 +28,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.pb.h"
+#include "utility/CompositeHash.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -218,6 +219,13 @@ class Tuple {
     return attribute_values_.size();
   }
 
+  /**
+   * @brief Get the hash value of the tuple.
+   **/
+  std::size_t getTupleHash() const {
+    return HashCompositeKey(attribute_values_);
+  }
+
  private:
   /**
    * @brief Constructor which does not create any attributes, nor pre-reserve

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 395e264..e9be2ec 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -169,6 +169,7 @@ add_library(quickstep_utility_BloomFilter_proto
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp)
 add_library(quickstep_utility_DisjointTreeForest ../empty_src.cpp DisjointTreeForest.hpp)
 add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp)
@@ -230,6 +231,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_CompositeHash
+                      quickstep_types_TypedValue
+                      quickstep_utility_HashPair
+                      glog)
 target_link_libraries(quickstep_utility_DAG
                       glog
                       quickstep_utility_Macros)
@@ -325,6 +330,7 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e6738e82/utility/CompositeHash.hpp
----------------------------------------------------------------------
diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp
new file mode 100644
index 0000000..517bc96
--- /dev/null
+++ b/utility/CompositeHash.hpp
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_
+
+#include <cstddef>
+#include <vector>
+
+#include "types/TypedValue.hpp"
+#include "utility/HashPair.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief Compute the hash value of a composite key.
+ *
+ * @param key A vector of TypedValues which together form the composite key.
+ * @return The hash value.
+ **/
+static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) {
+  DCHECK(!key.empty());
+  std::size_t hash = key.front().getHash();
+  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
+       key_it != key.end();
+       ++key_it) {
+    hash = CombineHashes(hash, key_it->getHash());
+  }
+  return hash;
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_



Mime
View raw message