Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 19F50200C39 for ; Thu, 16 Mar 2017 20:51:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 18729160B8B; Thu, 16 Mar 2017 19:51:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9831F160B72 for ; Thu, 16 Mar 2017 20:51:11 +0100 (CET) Received: (qmail 83511 invoked by uid 500); 16 Mar 2017 19:51:10 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 83501 invoked by uid 99); 16 Mar 2017 19:51:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 19:51:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 601A11805CE for ; Thu, 16 Mar 2017 19:51:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.569 X-Spam-Level: X-Spam-Status: No, score=-3.569 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 3_iDlzwmGmO9 for ; Thu, 16 Mar 2017 19:51:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 927E85F5C9 for ; Thu, 16 Mar 2017 19:51:03 +0000 (UTC) Received: (qmail 83313 invoked by uid 99); 16 Mar 2017 19:50:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 19:50:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AEF1EDFE7B; Thu, 16 Mar 2017 19:50:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarasbob@apache.org To: commits@impala.incubator.apache.org Date: Thu, 16 Mar 2017 19:50:47 -0000 Message-Id: <05227b936e894e25ba544c54eee350e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-impala git commit: IMPALA-4787: Optimize APPX_MEDIAN() memory usage archived-at: Thu, 16 Mar 2017 19:51:13 -0000 Repository: incubator-impala Updated Branches: refs/heads/master 62894e323 -> 0ff1e6e8d IMPALA-4787: Optimize APPX_MEDIAN() memory usage Before this change, ReservoirSample functions (such as APPX_MEDIAN()) allocated memory for 20,000 elements up front per grouping key. This caused inefficient memory usage for aggregations with many grouping keys. This patch fixes this by initially allocating memory for 16 elements. Once the buffer becomes full, we reallocate a new buffer with double capacity and copy the original buffer into the new one. We continue doubling the buffer size until the buffer has room for 20,000 elements as before. Testing: Added some EE APPX_MEDIAN() tests on larger datasets that exercise the resize code path. Perf Benchrmark (about 35,000 elements per bucket): SELECT MAX(a) from ( SELECT c1, appx_median(c2) as a FROM benchmark GROUP BY c1) t BEFORE: 11s067ms Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------- 06:AGGREGATE 1 124.726us 124.726us 1 1 28.00 KB -1.00 B FINALIZE 05:EXCHANGE 1 29.544us 29.544us 3 1 0 -1.00 B UNPARTITIONED 02:AGGREGATE 3 86.406us 120.372us 3 1 44.00 KB 10.00 MB 04:AGGREGATE 3 1s840ms 2s824ms 2.00K -1 1.02 GB 128.00 MB FINALIZE 03:EXCHANGE 3 1s163ms 1s989ms 6.00K -1 0 0 HASH(c1) 01:AGGREGATE 3 3s356ms 3s416ms 6.00K -1 1.95 GB 128.00 MB STREAMING 00:SCAN HDFS 3 64.962ms 65.490ms 65.54M -1 25.97 MB 64.00 MB tpcds_10_parquet.benchmark AFTER: 9s465ms Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------ 06:AGGREGATE 1 73.961us 73.961us 1 1 28.00 KB -1.00 B FINALIZE 05:EXCHANGE 1 18.101us 18.101us 3 1 0 -1.00 B UNPARTITIONED 02:AGGREGATE 3 75.795us 83.969us 3 1 44.00 KB 10.00 MB 04:AGGREGATE 3 1s608ms 2s683ms 2.00K -1 1.02 GB 128.00 MB FINALIZE 03:EXCHANGE 3 826.683ms 1s322ms 6.00K -1 0 0 HASH(c1) 01:AGGREGATE 3 2s457ms 2s672ms 6.00K -1 3.14 GB 128.00 MB STREAMING 00:SCAN HDFS 3 81.514ms 89.056ms 65.54M -1 25.94 MB 64.00 MB tpcds_10_parquet.benchmark Memory Benchmark (about 12 elements per bucket): SELECT MAX(a) FROM ( SELECT ss_customer_sk, APPX_MEDIAN(ss_sold_date_sk) as a FROM tpcds_parquet.store_sales GROUP BY ss_customer_sk) t BEFORE: 7s477ms Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail --------------------------------------------------------------------------------------------------------------------- 06:AGGREGATE 1 114.686us 114.686us 1 1 28.00 KB -1.00 B FINALIZE 05:EXCHANGE 1 18.214us 18.214us 3 1 0 -1.00 B UNPARTITIONED 02:AGGREGATE 3 147.055us 165.464us 3 1 28.00 KB 10.00 MB 04:AGGREGATE 3 2s043ms 2s147ms 14.82K -1 4.94 GB 128.00 MB FINALIZE 03:EXCHANGE 3 840.528ms 943.254ms 15.61K -1 0 0 HASH(ss_customer_sk) 01:AGGREGATE 3 1s769ms 1s869ms 15.61K -1 5.32 GB 128.00 MB STREAMING 00:SCAN HDFS 3 17.941ms 37.109ms 183.59K -1 1.94 MB 16.00 MB tpcds_parquet.store_sales AFTER: 434ms Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail --------------------------------------------------------------------------------------------------------------------- 06:AGGREGATE 1 125.915us 125.915us 1 1 28.00 KB -1.00 B FINALIZE 05:EXCHANGE 1 72.179us 72.179us 3 1 0 -1.00 B UNPARTITIONED 02:AGGREGATE 3 79.054us 83.385us 3 1 28.00 KB 10.00 MB 04:AGGREGATE 3 6.559ms 7.669ms 14.82K -1 17.32 MB 128.00 MB FINALIZE 03:EXCHANGE 3 67.370us 85.068us 15.60K -1 0 0 HASH(ss_customer_sk) 01:AGGREGATE 3 19.245ms 24.472ms 15.60K -1 9.48 MB 128.00 MB STREAMING 00:SCAN HDFS 3 53.173ms 55.844ms 183.59K -1 1.18 MB 16.00 MB tpcds_parquet.store_sales Change-Id: I99adaad574d4fb0a3cf38c6cbad8b2a23df12968 Reviewed-on: http://gerrit.cloudera.org:8080/6025 Reviewed-by: Taras Bobrovytsky Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/529a5f99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/529a5f99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/529a5f99 Branch: refs/heads/master Commit: 529a5f99b959079faead34a977fba1125d01840e Parents: 62894e3 Author: Taras Bobrovytsky Authored: Mon Feb 13 18:14:56 2017 -0800 Committer: Impala Public Jenkins Committed: Thu Mar 16 05:59:40 2017 +0000 ---------------------------------------------------------------------- be/src/exprs/aggregate-functions-ir.cc | 441 +++++++++++++------ .../queries/QueryTest/aggregation.test | 17 +- .../queries/QueryTest/alloc-fail-init.test | 4 +- 3 files changed, 316 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/be/src/exprs/aggregate-functions-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc index 10a775d..fc4715a 100644 --- a/be/src/exprs/aggregate-functions-ir.cc +++ b/be/src/exprs/aggregate-functions-ir.cc @@ -40,10 +40,11 @@ using boost::uniform_int; using boost::ranlux64_3; -using std::push_heap; -using std::pop_heap; -using std::map; using std::make_pair; +using std::map; +using std::nth_element; +using std::pop_heap; +using std::push_heap; namespace { // Threshold for each precision where it's better to use linear counting instead @@ -122,9 +123,6 @@ int64_t HllEstimateBias(int64_t estimate) { } -// TODO: this file should be cross compiled and then all of the builtin -// aggregate functions will have a codegen enabled path. Then we can remove -// the custom code in aggregation node. namespace impala { // This function initializes StringVal 'dst' with a newly allocated buffer of @@ -902,8 +900,6 @@ BigIntVal AggregateFunctions::PcsaFinalize(FunctionContext* c, const StringVal& // TODO: Expose as constant argument parameters to the UDA. const static int NUM_BUCKETS = 100; const static int NUM_SAMPLES_PER_BUCKET = 200; -const static int NUM_SAMPLES = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET; -const static int MAX_STRING_SAMPLE_LEN = 10; template struct ReservoirSample { @@ -919,6 +915,9 @@ struct ReservoirSample { T GetValue(FunctionContext* ctx) { return val; } }; +// Maximum length of a string sample. +const static int MAX_STRING_SAMPLE_LEN = 10; + // Template specialization for StringVal because we do not store the StringVal itself. // Instead, we keep fixed size arrays and truncate longer strings if necessary. template <> @@ -941,22 +940,267 @@ struct ReservoirSample { }; template -struct ReservoirSampleState { - ReservoirSample samples[NUM_SAMPLES]; +bool SampleValLess(const ReservoirSample& i, const ReservoirSample& j) { + return i.val.val < j.val.val; +} + +template <> +bool SampleValLess(const ReservoirSample& i, + const ReservoirSample& j) { + int n = min(i.len, j.len); + int result = memcmp(&i.val[0], &j.val[0], n); + if (result == 0) return i.len < j.len; + return result < 0; +} + +template <> +bool SampleValLess(const ReservoirSample& i, + const ReservoirSample& j) { + // Also handles val4 and val8 - the DecimalVal memory layout ensures the least + // significant bits overlap in memory. + return i.val.val16 < j.val.val16; +} + +template <> +bool SampleValLess(const ReservoirSample& i, + const ReservoirSample& j) { + if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day; + else return i.val.date < j.val.date; +} + +template +bool SampleKeyGreater(const ReservoirSample& i, const ReservoirSample& j) { + return i.key > j.key; +} + +// Keeps track of the current state of the reservoir sampling algorithm. The samples are +// stored in a dynamically sized array. Initially, the the samples array is stored in a +// separate memory allocation. This class is responsible for managing the memory of the +// array and reallocating when the array is full. When this object is serialized into an +// output buffer, the samples array is inlined into the output buffer as well. +template +class ReservoirSampleState { + public: + ReservoirSampleState(FunctionContext* ctx) + : num_samples_(0), + capacity_(INIT_CAPACITY), + source_size_(0), + sample_array_inline_(false), + samples_(NULL) { + // Allocate some initial memory for the samples array. + size_t buffer_len = sizeof(ReservoirSample) * capacity_; + uint8_t* ptr = ctx->Allocate(buffer_len); + if (ptr == NULL) { + DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); + return; + } + samples_ = reinterpret_cast*>(ptr); + } + + // Returns a pointer to a ReservoirSample at idx. + ReservoirSample* GetSample(int64_t idx) { + DCHECK(samples_ != NULL); + DCHECK_LT(idx, num_samples_); + DCHECK_LE(num_samples_, capacity_); + DCHECK_GE(idx, 0); + return &samples_[idx]; + } + + // Adds a sample and increments the source size. Doubles the capacity of the sample + // array if necessary. If max capacity is reached, randomly evicts a sample (as + // required by the algorithm). Returns false if the attempt to double the capacity + // fails, true otherwise. + bool AddSample(FunctionContext* ctx, const ReservoirSample& s) { + DCHECK(samples_ != NULL); + DCHECK_LE(num_samples_, MAX_CAPACITY); + if (num_samples_ < MAX_CAPACITY) { + if (num_samples_ == capacity_) { + bool result = IncreaseCapacity(ctx, capacity_ * 2); + if (!result) return false; + } + DCHECK_LT(num_samples_, capacity_); + samples_[num_samples_++] = s; + } else { + DCHECK_EQ(num_samples_, MAX_CAPACITY); + DCHECK(!sample_array_inline_); + int64_t idx = GetNext64(source_size_); + if (idx < MAX_CAPACITY) samples_[idx] = s; + } + ++source_size_; + return true; + } + + // Same as above. + bool AddSample(FunctionContext* ctx, const T& s) { + return AddSample(ctx, ReservoirSample(s)); + } + + // Returns a buffer with a serialized ReservoirSampleState and the array of samples it + // contains. The samples array must not be inlined; i.e. it must be in a separate memory + // allocation. Returns a buffer containing this object and inlined samples array. The + // memory containing this object and the samples array is freed. The serialized object + // in the output buffer requires a call to Deserialize() before use. + StringVal Serialize(FunctionContext* ctx) { + DCHECK(samples_ != NULL); + DCHECK(!sample_array_inline_); + // Assign keys to the samples that haven't been set (i.e. if serializing after + // Update()). In weighted reservoir sampling the keys are typically assigned as the + // sources are being sampled, but this requires maintaining the samples in sorted + // order (by key) and it accomplishes the same thing at this point because all data + // points coming into Update() get the same weight. When the samples are later merged, + // they do have different weights (set here) that are proportional to the source_size, + // i.e. samples selected from a larger stream are more likely to end up in the final + // sample set. In order to avoid the extra overhead in Update(), we approximate the + // keys by picking random numbers in the range + // [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. This weights the keys by + // SOURCE_SIZE and implies that the samples picked had the highest keys, because + // values not sampled would have keys between 0 and + // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE). + for (int i = 0; i < num_samples_; ++i) { + if (samples_[i].key >= 0) continue; + int r = rand() % num_samples_; + samples_[i].key = ((double) source_size_ - r) / source_size_; + } + capacity_ = num_samples_; + sample_array_inline_ = true; + + size_t buffer_len = sizeof(ReservoirSampleState) + + sizeof(ReservoirSample) * num_samples_; + StringVal dst = StringVal::CopyFrom( + ctx, reinterpret_cast(this), buffer_len); + memcpy(dst.ptr + sizeof(ReservoirSampleState), + reinterpret_cast(samples_), sizeof(ReservoirSample) * num_samples_); + ctx->Free(reinterpret_cast(samples_)); + ctx->Free(reinterpret_cast(this)); + return dst; + } + + // Updates the pointer to the samples array. Must be called before using this object in + // Merge(). + void Deserialize() { + DCHECK(sample_array_inline_); + samples_ = reinterpret_cast*>(this + 1); + } + + // Merges the samples in "other_state" into the current state by following the + // reservoir sampling algorithm. If necessary, increases the capacity to fit the + // samples from "other_state". In the case of failure to increase the size of the + // array, returns. + void Merge(FunctionContext* ctx, ReservoirSampleState* other_state) { + DCHECK(samples_ != NULL); + DCHECK_GT(capacity_, 0); + other_state->Deserialize(); + int src_idx = 0; + // We can increase the capacity significantly here and skip several doublings because + // we know the number of elements in the other state up front. + if (capacity_ < MAX_CAPACITY) { + int necessary_capacity = num_samples_ + other_state->num_samples(); + if (capacity_ < necessary_capacity) { + bool result = IncreaseCapacity(ctx, necessary_capacity); + if (!result) return; + } + } + + // First, fill up the dst samples if they don't already exist. The samples are now + // ordered as a min-heap on the key. + while (num_samples_ < MAX_CAPACITY && src_idx < other_state->num_samples()) { + DCHECK_GE(other_state->GetSample(src_idx)->key, 0); + bool result = AddSample(ctx, *other_state->GetSample(src_idx++)); + if (!result) return; + push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater); + } + + // Then for every sample from source, take the sample if the key is greater than + // the minimum key in the min-heap. + while (src_idx < other_state->num_samples()) { + DCHECK_GE(other_state->GetSample(src_idx)->key, 0); + if (other_state->GetSample(src_idx)->key > samples_[0].key) { + pop_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater); + samples_[MAX_CAPACITY - 1] = *other_state->GetSample(src_idx); + push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater); + } + ++src_idx; + } + + source_size_ += other_state->source_size(); + } + + // Returns the median element. + T GetMedian(FunctionContext* ctx) { + if (num_samples_ == 0) return T::null(); + ReservoirSample* mid_point = GetSample(num_samples_ / 2); + nth_element(&samples_[0], mid_point, &samples_[num_samples_], SampleValLess); + return mid_point->GetValue(ctx); + } + + // Sorts the samples. + void SortSamples() { + sort(&samples_[0], &samples_[num_samples_], SampleValLess); + } + + // Deletes this object by freeing the memory that contains the array of samples (if not + // inlined) and itself. + void Delete(FunctionContext* ctx) { + if (!sample_array_inline_) ctx->Free(reinterpret_cast(samples_)); + ctx->Free(reinterpret_cast(this)); + } + + int num_samples() { return num_samples_; } + int64_t source_size() { return source_size_; } + + private: + // The initial capacity of the samples array. + const static int INIT_CAPACITY = 16; + + // Maximum capacity of the samples array. + const static int MAX_CAPACITY = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET; // Number of collected samples. - int num_samples; + int num_samples_; + + // Size of the "samples_" array. + int capacity_; // Number of values over which the samples were collected. - int64_t source_size; + int64_t source_size_; // Random number generator for generating 64-bit integers // TODO: Replace with mt19937_64 when upgrading boost - ranlux64_3 rng; + ranlux64_3 rng_; + + // True if the array of samples is in the same memory allocation as this object. If + // false, this object is responsible for freeing the memory. + bool sample_array_inline_; + + // Points to the array of ReservoirSamples. The array may be located inline (right after + // this object), or in a separate memory allocation. + ReservoirSample* samples_; + + // Increases the capacity of the "samples_" array to "new_capacity" rounded up to a + // power of two by reallocating. Should only be called if the samples array is not + // inline. Returns false if the operation fails. + bool IncreaseCapacity(FunctionContext* ctx, int new_capacity) { + DCHECK(samples_ != NULL); + DCHECK(!sample_array_inline_); + DCHECK_LT(capacity_, MAX_CAPACITY); + DCHECK_GT(new_capacity, capacity_); + new_capacity = BitUtil::RoundUpToPowerOfTwo(new_capacity); + if (new_capacity > MAX_CAPACITY) new_capacity = MAX_CAPACITY; + size_t buffer_len = sizeof(ReservoirSample) * new_capacity; + uint8_t* ptr = ctx->Reallocate(reinterpret_cast(samples_), buffer_len); + if (ptr == NULL) { + DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); + return false; + } + samples_ = reinterpret_cast*>(ptr); + capacity_ = new_capacity; + return true; + } + // Returns a random integer in the range [0, max]. int64_t GetNext64(int64_t max) { uniform_int dist(0, max); - return dist(rng); + return dist(rng_); } }; @@ -967,7 +1211,9 @@ void AggregateFunctions::ReservoirSampleInit(FunctionContext* ctx, StringVal* ds DCHECK(!ctx->impl()->state()->GetQueryStatus().ok()); return; } - *reinterpret_cast*>(dst->ptr) = ReservoirSampleState(); + ReservoirSampleState* dst_state = + reinterpret_cast*>(dst->ptr); + *dst_state = ReservoirSampleState(ctx); } template @@ -975,113 +1221,32 @@ void AggregateFunctions::ReservoirSampleUpdate(FunctionContext* ctx, const T& sr StringVal* dst) { if (src.is_null) return; DCHECK(!dst->is_null); - DCHECK_EQ(dst->len, sizeof(ReservoirSampleState)); - ReservoirSampleState* state = reinterpret_cast*>(dst->ptr); - - if (state->num_samples < NUM_SAMPLES) { - state->samples[state->num_samples++] = ReservoirSample(src); - } else { - int64_t r = state->GetNext64(state->source_size); - if (r < NUM_SAMPLES) state->samples[r] = ReservoirSample(src); - } - ++state->source_size; + ReservoirSampleState* dst_state = + reinterpret_cast*>(dst->ptr); + dst_state->AddSample(ctx, src); } template StringVal AggregateFunctions::ReservoirSampleSerialize(FunctionContext* ctx, const StringVal& src) { if (UNLIKELY(src.is_null)) return src; - StringVal result = StringVal::CopyFrom(ctx, src.ptr, src.len); - ctx->Free(src.ptr); - if (UNLIKELY(result.is_null)) return result; - - ReservoirSampleState* state = reinterpret_cast*>(result.ptr); - // Assign keys to the samples that haven't been set (i.e. if serializing after - // Update()). In weighted reservoir sampling the keys are typically assigned as the - // sources are being sampled, but this requires maintaining the samples in sorted order - // (by key) and it accomplishes the same thing at this point because all data points - // coming into Update() get the same weight. When the samples are later merged, they do - // have different weights (set here) that are proportional to the source_size, i.e. - // samples selected from a larger stream are more likely to end up in the final sample - // set. In order to avoid the extra overhead in Update(), we approximate the keys by - // picking random numbers in the range [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. - // This weights the keys by SOURCE_SIZE and implies that the samples picked had the - // highest keys, because values not sampled would have keys between 0 and - // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE). - for (int i = 0; i < state->num_samples; ++i) { - if (state->samples[i].key >= 0) continue; - int r = rand() % state->num_samples; - state->samples[i].key = ((double) state->source_size - r) / state->source_size; - } + ReservoirSampleState* src_state = + reinterpret_cast*>(src.ptr); + StringVal result = src_state->Serialize(ctx); return result; } template -bool SampleValLess(const ReservoirSample& i, const ReservoirSample& j) { - return i.val.val < j.val.val; -} - -template <> -bool SampleValLess(const ReservoirSample& i, - const ReservoirSample& j) { - int n = min(i.len, j.len); - int result = memcmp(&i.val[0], &j.val[0], n); - if (result == 0) return i.len < j.len; - return result < 0; -} - -template <> -bool SampleValLess(const ReservoirSample& i, - const ReservoirSample& j) { - // Also handles val4 and val8 - the DecimalVal memory layout ensures the least - // significant bits overlap in memory. - return i.val.val16 < j.val.val16; -} - -template <> -bool SampleValLess(const ReservoirSample& i, - const ReservoirSample& j) { - if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day; - else return i.val.date < j.val.date; -} - -template -bool SampleKeyGreater(const ReservoirSample& i, const ReservoirSample& j) { - return i.key > j.key; -} - -template void AggregateFunctions::ReservoirSampleMerge(FunctionContext* ctx, - const StringVal& src_val, StringVal* dst_val) { - if (src_val.is_null) return; - DCHECK(!dst_val->is_null); - DCHECK(!src_val.is_null); - DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState)); - DCHECK_EQ(dst_val->len, sizeof(ReservoirSampleState)); - ReservoirSampleState* src = reinterpret_cast*>(src_val.ptr); - ReservoirSampleState* dst = reinterpret_cast*>(dst_val->ptr); - - int src_idx = 0; - int src_max = src->num_samples; - // First, fill up the dst samples if they don't already exist. The samples are now - // ordered as a min-heap on the key. - while (dst->num_samples < NUM_SAMPLES && src_idx < src_max) { - DCHECK_GE(src->samples[src_idx].key, 0); - dst->samples[dst->num_samples++] = src->samples[src_idx++]; - push_heap(dst->samples, dst->samples + dst->num_samples, SampleKeyGreater); - } - // Then for every sample from source, take the sample if the key is greater than - // the minimum key in the min-heap. - while (src_idx < src_max) { - DCHECK_GE(src->samples[src_idx].key, 0); - if (src->samples[src_idx].key > dst->samples[0].key) { - pop_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater); - dst->samples[NUM_SAMPLES - 1] = src->samples[src_idx]; - push_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater); - } - ++src_idx; - } - dst->source_size += src->source_size; + const StringVal& src, StringVal* dst) { + if (src.is_null) return; + DCHECK(!dst->is_null); + DCHECK(!src.is_null); + ReservoirSampleState* src_state = + reinterpret_cast*>(src.ptr); + ReservoirSampleState* dst_state = + reinterpret_cast*>(dst->ptr); + dst_state->Merge(ctx, src_state); } template @@ -1112,64 +1277,56 @@ void PrintSample(const ReservoirSample& v, ostream* os) { template StringVal AggregateFunctions::ReservoirSampleFinalize(FunctionContext* ctx, - const StringVal& src_val) { - if (UNLIKELY(src_val.is_null)) return src_val; - DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState)); - ReservoirSampleState* src = reinterpret_cast*>(src_val.ptr); + const StringVal& src) { + if (UNLIKELY(src.is_null)) return src; + ReservoirSampleState* src_state = + reinterpret_cast*>(src.ptr); stringstream out; - for (int i = 0; i < src->num_samples; ++i) { - PrintSample(src->samples[i], &out); - if (i < (src->num_samples - 1)) out << ", "; + for (int i = 0; i < src_state->num_samples(); ++i) { + PrintSample(*src_state->GetSample(i), &out); + if (i < (src_state->num_samples() - 1)) out << ", "; } const string& out_str = out.str(); StringVal result_str(ctx, out_str.size()); if (LIKELY(!result_str.is_null)) { memcpy(result_str.ptr, out_str.c_str(), result_str.len); } - ctx->Free(src_val.ptr); + src_state->Delete(ctx); return result_str; } template StringVal AggregateFunctions::HistogramFinalize(FunctionContext* ctx, - const StringVal& src_val) { - if (UNLIKELY(src_val.is_null)) return src_val; - DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState)); + const StringVal& src) { + if (UNLIKELY(src.is_null)) return src; - ReservoirSampleState* src = reinterpret_cast*>(src_val.ptr); - sort(src->samples, src->samples + src->num_samples, SampleValLess); + ReservoirSampleState* src_state = + reinterpret_cast*>(src.ptr); + src_state->SortSamples(); stringstream out; - int num_buckets = min(src->num_samples, NUM_BUCKETS); - int samples_per_bucket = max(src->num_samples / NUM_BUCKETS, 1); + int num_buckets = min(src_state->num_samples(), NUM_BUCKETS); + int samples_per_bucket = max(src_state->num_samples() / NUM_BUCKETS, 1); for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) { int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1; - PrintSample(src->samples[sample_idx], &out); + PrintSample(*(src_state->GetSample(sample_idx)), &out); if (bucket_idx < (num_buckets - 1)) out << ", "; } const string& out_str = out.str(); StringVal result_str = StringVal::CopyFrom(ctx, reinterpret_cast(out_str.c_str()), out_str.size()); - ctx->Free(src_val.ptr); + src_state->Delete(ctx); return result_str; } template -T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, - const StringVal& src_val) { - if (UNLIKELY(src_val.is_null)) return T::null(); - DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState)); - - ReservoirSampleState* src = reinterpret_cast*>(src_val.ptr); - if (src->num_samples == 0) { - ctx->Free(src_val.ptr); - return T::null(); - } - sort(src->samples, src->samples + src->num_samples, SampleValLess); - - T result = src->samples[src->num_samples / 2].GetValue(ctx); - ctx->Free(src_val.ptr); +T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const StringVal& src) { + if (UNLIKELY(src.is_null)) return T::null(); + ReservoirSampleState* src_state = + reinterpret_cast*>(src.ptr); + T result = src_state->GetMedian(ctx); + src_state->Delete(ctx); return result; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/aggregation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test index 675c75f..524c63b 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test @@ -1160,6 +1160,8 @@ select histogram(timestamp_col) from functional.alltypestiny; STRING ==== ---- QUERY +# IMPALA-4787: appx_median() on a medium sized dataset. This should excercise merge() with +# differently sized inputs in the Reservoir Sampling algorithm. select appx_median(bool_col), appx_median(tinyint_col), @@ -1169,13 +1171,24 @@ appx_median(float_col), appx_median(double_col), appx_median(string_col), appx_median(timestamp_col) -from alltypestiny +from alltypes ---- RESULTS -true,1,1,1,1.100000023841858,10.1,'1',2009-03-01 00:00:00 +true,5,5,5,5.5,50.5,'5',2010-01-01 00:00:00 ---- TYPES BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE, STRING, TIMESTAMP ==== ---- QUERY +# IMPALA-4787: appx_median on a large dataset. This requires several buffer resizes in the +# Reservoir Sampling algorithm. +select appx_median(l_returnflag) +from tpch.lineitem +where l_returnflag = "N" +---- RESULTS +'N' +---- TYPES +STRING +==== +---- QUERY # IMPALA-1419: Agg fn containing arithmetic expr on NULL fails select count(null * 1) from functional.alltypes ---- RESULTS http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test index 338f8bd..eb4646c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test +++ b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test @@ -33,7 +33,7 @@ FunctionContext::Allocate() failed to allocate 16 bytes. ---- QUERY select sample(timestamp_col) from functional.alltypes ---- CATCH -FunctionContext::Allocate() failed to allocate 480232 bytes. +FunctionContext::Allocate() failed to allocate 248 bytes. ==== ---- QUERY select distinctpc(int_col) from functional.alltypes @@ -93,5 +93,5 @@ FunctionContextImpl::AllocateLocal() failed to allocate 120 bytes. ---- QUERY select appx_median(int_col) from functional.alltypes ---- CATCH -FunctionContext::Allocate() failed to allocate 320232 bytes. +FunctionContext::Allocate() failed to allocate 248 bytes. ====