Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8BF6F200B9B for ; Tue, 6 Sep 2016 22:15:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8B0FD160ACB; Tue, 6 Sep 2016 20:15:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9CC6D160ACE for ; Tue, 6 Sep 2016 22:15:47 +0200 (CEST) Received: (qmail 11866 invoked by uid 500); 6 Sep 2016 20:15:46 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 11857 invoked by uid 99); 6 Sep 2016 20:15:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 20:15:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6CD331A52FC for ; Tue, 6 Sep 2016 20:15:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id q92dIArwHBFN for ; Tue, 6 Sep 2016 20:15:45 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 2650860D9B for ; Tue, 6 Sep 2016 20:15:43 +0000 (UTC) Received: (qmail 11202 invoked by uid 99); 6 Sep 2016 20:15:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Sep 2016 20:15:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D6C34E0579; Tue, 6 Sep 2016 20:15:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Tue, 06 Sep 2016 20:16:43 -0000 Message-Id: <7918a39ed49141ebaced2075dbe40a03@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [66/73] [abbrv] incubator-quickstep git commit: Created method for partition aware finalize aggregate. archived-at: Tue, 06 Sep 2016 20:15:48 -0000 Created method for partition aware finalize aggregate. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/07afae28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/07afae28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/07afae28 Branch: refs/heads/partitioned-aggregation Commit: 07afae2849cbeb1cfa271d52df4482e39740dcc8 Parents: b22323e Author: Harshad Deshmukh Authored: Thu Aug 18 16:54:38 2016 -0500 Committer: Harshad Deshmukh Committed: Tue Sep 6 15:01:30 2016 -0500 ---------------------------------------------------------------------- storage/AggregationOperationState.cpp | 69 +++++++++++++++++++++++++++++- storage/AggregationOperationState.hpp | 33 +++++++++++++- 2 files changed, 98 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07afae28/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index c39e98a..6b4a672 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -566,8 +566,14 @@ void AggregationOperationState::finalizeHashTable( } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); - ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *agg_hash_table, &group_by_keys, agg_idx); + // TODO(harshad) - Modify the finalizeHashTable() function called below such + // that group_by_keys is a single ColumnVectorValueAccessor in which there + // is one ColumnVector per group by key. If we do that, the code below + // for reorganizing group_by_keys can be removed. + ColumnVector* agg_result_col = + handles_[agg_idx]->finalizeHashTable(*agg_hash_table, + &group_by_keys, + agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } @@ -618,4 +624,63 @@ void AggregationOperationState::finalizeHashTable( output_destination->bulkInsertTuples(&complete_result); } +void AggregationOperationState::finalizeAggregatePartitioned( + const std::size_t partition_id, InsertDestination *output_destination) { + // Each element of 'group_by_keys' is a vector of values for a particular + // group (which is also the prefix of the finalized Tuple for that group). + std::vector> group_by_keys; + + // Collect per-aggregate finalized values. + std::vector> final_values; + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + AggregationStateHashTableBase *hash_table = + partitioned_group_by_hashtable_pool_->getHashTable(partition_id); + ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( + *hash_table, &group_by_keys, agg_idx); + if (agg_result_col != nullptr) { + final_values.emplace_back(agg_result_col); + } + } + + // Reorganize 'group_by_keys' in column-major order so that we can make a + // ColumnVectorsValueAccessor to bulk-insert results. + // + // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal + // if there is only one aggregate. The need to do this should hopefully go + // away when we work out storing composite structures for multiple aggregates + // in a single HashTable. + std::vector> group_by_cvs; + std::size_t group_by_element_idx = 0; + for (const std::unique_ptr &group_by_element : group_by_list_) { + const Type &group_by_type = group_by_element->getType(); + if (NativeColumnVector::UsableForType(group_by_type)) { + NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } else { + IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } + ++group_by_element_idx; + } + + // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys + // and the finalized aggregates. + ColumnVectorsValueAccessor complete_result; + for (std::unique_ptr &group_by_cv : group_by_cvs) { + complete_result.addColumn(group_by_cv.release()); + } + for (std::unique_ptr &final_value_cv : final_values) { + complete_result.addColumn(final_value_cv.release()); + } + + // Bulk-insert the complete result. + output_destination->bulkInsertTuples(&complete_result); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07afae28/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 7e8acb5..37d77e3 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -168,8 +168,37 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); - static void mergeGroupByHashTables(AggregationStateHashTableBase *src, - AggregationStateHashTableBase *dst); + /** + * @brief Generate the final results for the aggregates managed by this + * AggregationOperationState, for the given partition and write them + * out to StorageBlock(s). + * + * @param partition_id The Partition ID for which the finalize has to be + * performed. + * @param output_destination An InsertDestination where the finalized output + * tuple(s) from this aggregate are to be written. + **/ + void finalizeAggregatePartitioned(const std::size_t partition_id, + InsertDestination *output_destination); + + bool isAggregatePartitioned() const { + return is_aggregate_partitioned_; + } + + /** + * @brief Get the number of partitions used for the aggregation. + * + * @note This is relevant only when is_aggregate_partitioned_ is true. + * + * @return The number of partitions used for the aggregation. Default is 1. + **/ + std::size_t getNumPartitions() const { + if (is_aggregate_partitioned_) { + return partitioned_group_by_hashtable_pool_->getNumPartitions(); + } else { + return 1; + } + } int dflag;