impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-4787: Optimize APPX_MEDIAN() memory usage
Date Thu, 16 Mar 2017 19:50:47 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 62894e323 -> 0ff1e6e8d


IMPALA-4787: Optimize APPX_MEDIAN() memory usage

Before this change, ReservoirSample functions (such as APPX_MEDIAN())
allocated memory for 20,000 elements up front per grouping key. This
caused inefficient memory usage for aggregations with many grouping
keys.

This patch fixes this by initially allocating memory for 16 elements.
Once the buffer becomes full, we reallocate a new buffer with double
capacity and copy the original buffer into the new one. We continue
doubling the buffer size until the buffer has room for 20,000 elements
as before.

Testing:
Added some EE APPX_MEDIAN() tests on larger datasets that exercise the
resize code path.

Perf Benchrmark (about 35,000 elements per bucket):

SELECT MAX(a) from (
  SELECT c1, appx_median(c2) as a FROM benchmark GROUP BY c1) t

BEFORE: 11s067ms
Operator       #Hosts   Avg Time   Max Time   #Rows  Est. #Rows  Peak Mem  Est. Peak Mem 
Detail
-------------------------------------------------------------------------------------------------------------------------
06:AGGREGATE        1  124.726us  124.726us       1           1  28.00 KB        -1.00 B 
FINALIZE
05:EXCHANGE         1   29.544us   29.544us       3           1         0        -1.00 B 
UNPARTITIONED
02:AGGREGATE        3   86.406us  120.372us       3           1  44.00 KB       10.00 MB
04:AGGREGATE        3    1s840ms    2s824ms   2.00K          -1   1.02 GB      128.00 MB 
FINALIZE
03:EXCHANGE         3    1s163ms    1s989ms   6.00K          -1         0              0 
HASH(c1)
01:AGGREGATE        3    3s356ms    3s416ms   6.00K          -1   1.95 GB      128.00 MB 
STREAMING
00:SCAN HDFS        3   64.962ms   65.490ms  65.54M          -1  25.97 MB       64.00 MB 
tpcds_10_parquet.benchmark

AFTER: 9s465ms
Operator       #Hosts   Avg Time  Max Time   #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail
------------------------------------------------------------------------------------------------------------------------
06:AGGREGATE        1   73.961us  73.961us       1           1  28.00 KB        -1.00 B  FINALIZE
05:EXCHANGE         1   18.101us  18.101us       3           1         0        -1.00 B  UNPARTITIONED
02:AGGREGATE        3   75.795us  83.969us       3           1  44.00 KB       10.00 MB
04:AGGREGATE        3    1s608ms   2s683ms   2.00K          -1   1.02 GB      128.00 MB  FINALIZE
03:EXCHANGE         3  826.683ms   1s322ms   6.00K          -1         0              0  HASH(c1)
01:AGGREGATE        3    2s457ms   2s672ms   6.00K          -1   3.14 GB      128.00 MB  STREAMING
00:SCAN HDFS        3   81.514ms  89.056ms  65.54M          -1  25.94 MB       64.00 MB  tpcds_10_parquet.benchmark

Memory Benchmark (about 12 elements per bucket):

SELECT MAX(a) FROM (
  SELECT ss_customer_sk, APPX_MEDIAN(ss_sold_date_sk) as a
  FROM tpcds_parquet.store_sales
  GROUP BY ss_customer_sk) t

BEFORE: 7s477ms
Operator       #Hosts   Avg Time   Max Time    #Rows  Est. #Rows  Peak Mem  Est. Peak Mem
 Detail
---------------------------------------------------------------------------------------------------------------------
06:AGGREGATE        1  114.686us  114.686us        1           1  28.00 KB        -1.00 B
 FINALIZE
05:EXCHANGE         1   18.214us   18.214us        3           1         0        -1.00 B
 UNPARTITIONED
02:AGGREGATE        3  147.055us  165.464us        3           1  28.00 KB       10.00 MB
04:AGGREGATE        3    2s043ms    2s147ms   14.82K          -1   4.94 GB      128.00 MB
 FINALIZE
03:EXCHANGE         3  840.528ms  943.254ms   15.61K          -1         0              0
 HASH(ss_customer_sk)
01:AGGREGATE        3    1s769ms    1s869ms   15.61K          -1   5.32 GB      128.00 MB
 STREAMING
00:SCAN HDFS        3   17.941ms   37.109ms  183.59K          -1   1.94 MB       16.00 MB
 tpcds_parquet.store_sales

AFTER: 434ms
Operator       #Hosts   Avg Time   Max Time    #Rows  Est. #Rows  Peak Mem  Est. Peak Mem
 Detail
---------------------------------------------------------------------------------------------------------------------
06:AGGREGATE        1  125.915us  125.915us        1           1  28.00 KB        -1.00 B
 FINALIZE
05:EXCHANGE         1   72.179us   72.179us        3           1         0        -1.00 B
 UNPARTITIONED
02:AGGREGATE        3   79.054us   83.385us        3           1  28.00 KB       10.00 MB
04:AGGREGATE        3    6.559ms    7.669ms   14.82K          -1  17.32 MB      128.00 MB
 FINALIZE
03:EXCHANGE         3   67.370us   85.068us   15.60K          -1         0              0
 HASH(ss_customer_sk)
01:AGGREGATE        3   19.245ms   24.472ms   15.60K          -1   9.48 MB      128.00 MB
 STREAMING
00:SCAN HDFS        3   53.173ms   55.844ms  183.59K          -1   1.18 MB       16.00 MB
 tpcds_parquet.store_sales

Change-Id: I99adaad574d4fb0a3cf38c6cbad8b2a23df12968
Reviewed-on: http://gerrit.cloudera.org:8080/6025
Reviewed-by: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/529a5f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/529a5f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/529a5f99

Branch: refs/heads/master
Commit: 529a5f99b959079faead34a977fba1125d01840e
Parents: 62894e3
Author: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
Authored: Mon Feb 13 18:14:56 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Mar 16 05:59:40 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/aggregate-functions-ir.cc          | 441 +++++++++++++------
 .../queries/QueryTest/aggregation.test          |  17 +-
 .../queries/QueryTest/alloc-fail-init.test      |   4 +-
 3 files changed, 316 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 10a775d..fc4715a 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -40,10 +40,11 @@
 
 using boost::uniform_int;
 using boost::ranlux64_3;
-using std::push_heap;
-using std::pop_heap;
-using std::map;
 using std::make_pair;
+using std::map;
+using std::nth_element;
+using std::pop_heap;
+using std::push_heap;
 
 namespace {
 // Threshold for each precision where it's better to use linear counting instead
@@ -122,9 +123,6 @@ int64_t HllEstimateBias(int64_t estimate) {
 
 }
 
-// TODO: this file should be cross compiled and then all of the builtin
-// aggregate functions will have a codegen enabled path. Then we can remove
-// the custom code in aggregation node.
 namespace impala {
 
 // This function initializes StringVal 'dst' with a newly allocated buffer of
@@ -902,8 +900,6 @@ BigIntVal AggregateFunctions::PcsaFinalize(FunctionContext* c, const StringVal&
 // TODO: Expose as constant argument parameters to the UDA.
 const static int NUM_BUCKETS = 100;
 const static int NUM_SAMPLES_PER_BUCKET = 200;
-const static int NUM_SAMPLES = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET;
-const static int MAX_STRING_SAMPLE_LEN = 10;
 
 template <typename T>
 struct ReservoirSample {
@@ -919,6 +915,9 @@ struct ReservoirSample {
   T GetValue(FunctionContext* ctx) { return val; }
 };
 
+// Maximum length of a string sample.
+const static int MAX_STRING_SAMPLE_LEN = 10;
+
 // Template specialization for StringVal because we do not store the StringVal itself.
 // Instead, we keep fixed size arrays and truncate longer strings if necessary.
 template <>
@@ -941,22 +940,267 @@ struct ReservoirSample<StringVal> {
 };
 
 template <typename T>
-struct ReservoirSampleState {
-  ReservoirSample<T> samples[NUM_SAMPLES];
+bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
+  return i.val.val < j.val.val;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<StringVal>& i,
+    const ReservoirSample<StringVal>& j) {
+  int n = min(i.len, j.len);
+  int result = memcmp(&i.val[0], &j.val[0], n);
+  if (result == 0) return i.len < j.len;
+  return result < 0;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<DecimalVal>& i,
+    const ReservoirSample<DecimalVal>& j) {
+  // Also handles val4 and val8 - the DecimalVal memory layout ensures the least
+  // significant bits overlap in memory.
+  return i.val.val16 < j.val.val16;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<TimestampVal>& i,
+    const ReservoirSample<TimestampVal>& j) {
+  if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
+  else return i.val.date < j.val.date;
+}
+
+template <typename T>
+bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
+  return i.key > j.key;
+}
+
+// Keeps track of the current state of the reservoir sampling algorithm. The samples are
+// stored in a dynamically sized array. Initially, the the samples array is stored in a
+// separate memory allocation. This class is responsible for managing the memory of the
+// array and reallocating when the array is full. When this object is serialized into an
+// output buffer, the samples array is inlined into the output buffer as well.
+template <typename T>
+class ReservoirSampleState {
+ public:
+  ReservoirSampleState(FunctionContext* ctx)
+    : num_samples_(0),
+      capacity_(INIT_CAPACITY),
+      source_size_(0),
+      sample_array_inline_(false),
+      samples_(NULL) {
+    // Allocate some initial memory for the samples array.
+    size_t buffer_len = sizeof(ReservoirSample<T>) * capacity_;
+    uint8_t* ptr = ctx->Allocate(buffer_len);
+    if (ptr == NULL) {
+      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+      return;
+    }
+    samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
+  }
+
+  // Returns a pointer to a ReservoirSample at idx.
+  ReservoirSample<T>* GetSample(int64_t idx) {
+    DCHECK(samples_ != NULL);
+    DCHECK_LT(idx, num_samples_);
+    DCHECK_LE(num_samples_, capacity_);
+    DCHECK_GE(idx, 0);
+    return &samples_[idx];
+  }
+
+  // Adds a sample and increments the source size. Doubles the capacity of the sample
+  // array if necessary. If max capacity is reached, randomly evicts a sample (as
+  // required by the algorithm). Returns false if the attempt to double the capacity
+  // fails, true otherwise.
+  bool AddSample(FunctionContext* ctx, const ReservoirSample<T>& s) {
+    DCHECK(samples_ != NULL);
+    DCHECK_LE(num_samples_, MAX_CAPACITY);
+    if (num_samples_ < MAX_CAPACITY) {
+      if (num_samples_ == capacity_) {
+        bool result = IncreaseCapacity(ctx, capacity_ * 2);
+        if (!result) return false;
+      }
+      DCHECK_LT(num_samples_, capacity_);
+      samples_[num_samples_++] = s;
+    } else {
+      DCHECK_EQ(num_samples_, MAX_CAPACITY);
+      DCHECK(!sample_array_inline_);
+      int64_t idx = GetNext64(source_size_);
+      if (idx < MAX_CAPACITY) samples_[idx] = s;
+    }
+    ++source_size_;
+    return true;
+  }
+
+  // Same as above.
+  bool AddSample(FunctionContext* ctx, const T& s) {
+    return AddSample(ctx, ReservoirSample<T>(s));
+  }
+
+  // Returns a buffer with a serialized ReservoirSampleState and the array of samples it
+  // contains. The samples array must not be inlined; i.e. it must be in a separate memory
+  // allocation. Returns a buffer containing this object and inlined samples array. The
+  // memory containing this object and the samples array is freed. The serialized object
+  // in the output buffer requires a call to Deserialize() before use.
+  StringVal Serialize(FunctionContext* ctx) {
+    DCHECK(samples_ != NULL);
+    DCHECK(!sample_array_inline_);
+    // Assign keys to the samples that haven't been set (i.e. if serializing after
+    // Update()). In weighted reservoir sampling the keys are typically assigned as the
+    // sources are being sampled, but this requires maintaining the samples in sorted
+    // order (by key) and it accomplishes the same thing at this point because all data
+    // points coming into Update() get the same weight. When the samples are later merged,
+    // they do have different weights (set here) that are proportional to the source_size,
+    // i.e. samples selected from a larger stream are more likely to end up in the final
+    // sample set. In order to avoid the extra overhead in Update(), we approximate the
+    // keys by picking random numbers in the range
+    // [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. This weights the keys by
+    // SOURCE_SIZE and implies that the samples picked had the highest keys, because
+    // values not sampled would have keys between 0 and
+    // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
+    for (int i = 0; i < num_samples_; ++i) {
+      if (samples_[i].key >= 0) continue;
+      int r = rand() % num_samples_;
+      samples_[i].key = ((double) source_size_ - r) / source_size_;
+    }
+    capacity_ = num_samples_;
+    sample_array_inline_ = true;
+
+    size_t buffer_len = sizeof(ReservoirSampleState<T>) +
+        sizeof(ReservoirSample<T>) * num_samples_;
+    StringVal dst = StringVal::CopyFrom(
+        ctx, reinterpret_cast<uint8_t*>(this), buffer_len);
+    memcpy(dst.ptr + sizeof(ReservoirSampleState<T>),
+        reinterpret_cast<uint8_t*>(samples_), sizeof(ReservoirSample<T>) * num_samples_);
+    ctx->Free(reinterpret_cast<uint8_t*>(samples_));
+    ctx->Free(reinterpret_cast<uint8_t*>(this));
+    return dst;
+  }
+
+  // Updates the pointer to the samples array. Must be called before using this object in
+  // Merge().
+  void Deserialize() {
+    DCHECK(sample_array_inline_);
+    samples_ = reinterpret_cast<ReservoirSample<T>*>(this + 1);
+  }
+
+  // Merges the samples in "other_state" into the current state by following the
+  // reservoir sampling algorithm. If necessary, increases the capacity to fit the
+  // samples from "other_state". In the case of failure to increase the size of the
+  // array, returns.
+  void Merge(FunctionContext* ctx, ReservoirSampleState<T>* other_state) {
+    DCHECK(samples_ != NULL);
+    DCHECK_GT(capacity_, 0);
+    other_state->Deserialize();
+    int src_idx = 0;
+    // We can increase the capacity significantly here and skip several doublings because
+    // we know the number of elements in the other state up front.
+    if (capacity_ < MAX_CAPACITY) {
+      int necessary_capacity = num_samples_ + other_state->num_samples();
+      if (capacity_ < necessary_capacity) {
+        bool result = IncreaseCapacity(ctx, necessary_capacity);
+        if (!result) return;
+      }
+    }
+
+    // First, fill up the dst samples if they don't already exist. The samples are now
+    // ordered as a min-heap on the key.
+    while (num_samples_ < MAX_CAPACITY && src_idx < other_state->num_samples())
{
+      DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
+      bool result = AddSample(ctx, *other_state->GetSample(src_idx++));
+      if (!result) return;
+      push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+    }
+
+    // Then for every sample from source, take the sample if the key is greater than
+    // the minimum key in the min-heap.
+    while (src_idx < other_state->num_samples()) {
+      DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
+      if (other_state->GetSample(src_idx)->key > samples_[0].key) {
+        pop_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+        samples_[MAX_CAPACITY - 1] = *other_state->GetSample(src_idx);
+        push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+      }
+      ++src_idx;
+    }
+
+    source_size_ += other_state->source_size();
+  }
+
+  // Returns the median element.
+  T GetMedian(FunctionContext* ctx) {
+    if (num_samples_ == 0) return T::null();
+    ReservoirSample<T>* mid_point = GetSample(num_samples_ / 2);
+    nth_element(&samples_[0], mid_point, &samples_[num_samples_], SampleValLess<T>);
+    return mid_point->GetValue(ctx);
+  }
+
+  // Sorts the samples.
+  void SortSamples() {
+    sort(&samples_[0], &samples_[num_samples_], SampleValLess<T>);
+  }
+
+  // Deletes this object by freeing the memory that contains the array of samples (if not
+  // inlined) and itself.
+  void Delete(FunctionContext* ctx) {
+    if (!sample_array_inline_) ctx->Free(reinterpret_cast<uint8_t*>(samples_));
+    ctx->Free(reinterpret_cast<uint8_t*>(this));
+  }
+
+  int num_samples() { return num_samples_; }
+  int64_t source_size() { return source_size_; }
+
+ private:
+  // The initial capacity of the samples array.
+  const static int INIT_CAPACITY = 16;
+
+  // Maximum capacity of the samples array.
+  const static int MAX_CAPACITY = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET;
 
   // Number of collected samples.
-  int num_samples;
+  int num_samples_;
+
+  // Size of the "samples_" array.
+  int capacity_;
 
   // Number of values over which the samples were collected.
-  int64_t source_size;
+  int64_t source_size_;
 
   // Random number generator for generating 64-bit integers
   // TODO: Replace with mt19937_64 when upgrading boost
-  ranlux64_3 rng;
+  ranlux64_3 rng_;
+
+  // True if the array of samples is in the same memory allocation as this object. If
+  // false, this object is responsible for freeing the memory.
+  bool sample_array_inline_;
+
+  // Points to the array of ReservoirSamples. The array may be located inline (right after
+  // this object), or in a separate memory allocation.
+  ReservoirSample<T>* samples_;
+
+  // Increases the capacity of the "samples_" array to "new_capacity" rounded up to a
+  // power of two by reallocating. Should only be called if the samples array is not
+  // inline. Returns false if the operation fails.
+  bool IncreaseCapacity(FunctionContext* ctx, int new_capacity) {
+    DCHECK(samples_ != NULL);
+    DCHECK(!sample_array_inline_);
+    DCHECK_LT(capacity_, MAX_CAPACITY);
+    DCHECK_GT(new_capacity, capacity_);
+    new_capacity = BitUtil::RoundUpToPowerOfTwo(new_capacity);
+    if (new_capacity > MAX_CAPACITY) new_capacity = MAX_CAPACITY;
+    size_t buffer_len = sizeof(ReservoirSample<T>) * new_capacity;
+    uint8_t* ptr = ctx->Reallocate(reinterpret_cast<uint8_t*>(samples_), buffer_len);
+    if (ptr == NULL) {
+      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+      return false;
+    }
+    samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
+    capacity_ = new_capacity;
+    return true;
+  }
 
+  // Returns a random integer in the range [0, max].
   int64_t GetNext64(int64_t max) {
     uniform_int<int64_t> dist(0, max);
-    return dist(rng);
+    return dist(rng_);
   }
 };
 
@@ -967,7 +1211,9 @@ void AggregateFunctions::ReservoirSampleInit(FunctionContext* ctx, StringVal*
ds
     DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
     return;
   }
-  *reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr) = ReservoirSampleState<T>();
+  ReservoirSampleState<T>* dst_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+  *dst_state = ReservoirSampleState<T>(ctx);
 }
 
 template <typename T>
@@ -975,113 +1221,32 @@ void AggregateFunctions::ReservoirSampleUpdate(FunctionContext* ctx,
const T& sr
     StringVal* dst) {
   if (src.is_null) return;
   DCHECK(!dst->is_null);
-  DCHECK_EQ(dst->len, sizeof(ReservoirSampleState<T>));
-  ReservoirSampleState<T>* state = reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
-
-  if (state->num_samples < NUM_SAMPLES) {
-    state->samples[state->num_samples++] = ReservoirSample<T>(src);
-  } else {
-    int64_t r = state->GetNext64(state->source_size);
-    if (r < NUM_SAMPLES) state->samples[r] = ReservoirSample<T>(src);
-  }
-  ++state->source_size;
+  ReservoirSampleState<T>* dst_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+  dst_state->AddSample(ctx, src);
 }
 
 template <typename T>
 StringVal AggregateFunctions::ReservoirSampleSerialize(FunctionContext* ctx,
     const StringVal& src) {
   if (UNLIKELY(src.is_null)) return src;
-  StringVal result = StringVal::CopyFrom(ctx, src.ptr, src.len);
-  ctx->Free(src.ptr);
-  if (UNLIKELY(result.is_null)) return result;
-
-  ReservoirSampleState<T>* state = reinterpret_cast<ReservoirSampleState<T>*>(result.ptr);
-  // Assign keys to the samples that haven't been set (i.e. if serializing after
-  // Update()). In weighted reservoir sampling the keys are typically assigned as the
-  // sources are being sampled, but this requires maintaining the samples in sorted order
-  // (by key) and it accomplishes the same thing at this point because all data points
-  // coming into Update() get the same weight. When the samples are later merged, they do
-  // have different weights (set here) that are proportional to the source_size, i.e.
-  // samples selected from a larger stream are more likely to end up in the final sample
-  // set. In order to avoid the extra overhead in Update(), we approximate the keys by
-  // picking random numbers in the range [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1].
-  // This weights the keys by SOURCE_SIZE and implies that the samples picked had the
-  // highest keys, because values not sampled would have keys between 0 and
-  // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
-  for (int i = 0; i < state->num_samples; ++i) {
-    if (state->samples[i].key >= 0) continue;
-    int r = rand() % state->num_samples;
-    state->samples[i].key = ((double) state->source_size - r) / state->source_size;
-  }
+  ReservoirSampleState<T>* src_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+  StringVal result = src_state->Serialize(ctx);
   return result;
 }
 
 template <typename T>
-bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
-  return i.val.val < j.val.val;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<StringVal>& i,
-    const ReservoirSample<StringVal>& j) {
-  int n = min(i.len, j.len);
-  int result = memcmp(&i.val[0], &j.val[0], n);
-  if (result == 0) return i.len < j.len;
-  return result < 0;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<DecimalVal>& i,
-    const ReservoirSample<DecimalVal>& j) {
-  // Also handles val4 and val8 - the DecimalVal memory layout ensures the least
-  // significant bits overlap in memory.
-  return i.val.val16 < j.val.val16;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<TimestampVal>& i,
-    const ReservoirSample<TimestampVal>& j) {
-  if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
-  else return i.val.date < j.val.date;
-}
-
-template <typename T>
-bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
-  return i.key > j.key;
-}
-
-template <typename T>
 void AggregateFunctions::ReservoirSampleMerge(FunctionContext* ctx,
-    const StringVal& src_val, StringVal* dst_val) {
-  if (src_val.is_null) return;
-  DCHECK(!dst_val->is_null);
-  DCHECK(!src_val.is_null);
-  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
-  DCHECK_EQ(dst_val->len, sizeof(ReservoirSampleState<T>));
-  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
-  ReservoirSampleState<T>* dst = reinterpret_cast<ReservoirSampleState<T>*>(dst_val->ptr);
-
-  int src_idx = 0;
-  int src_max = src->num_samples;
-  // First, fill up the dst samples if they don't already exist. The samples are now
-  // ordered as a min-heap on the key.
-  while (dst->num_samples < NUM_SAMPLES && src_idx < src_max) {
-    DCHECK_GE(src->samples[src_idx].key, 0);
-    dst->samples[dst->num_samples++] = src->samples[src_idx++];
-    push_heap(dst->samples, dst->samples + dst->num_samples, SampleKeyGreater<T>);
-  }
-  // Then for every sample from source, take the sample if the key is greater than
-  // the minimum key in the min-heap.
-  while (src_idx < src_max) {
-    DCHECK_GE(src->samples[src_idx].key, 0);
-    if (src->samples[src_idx].key > dst->samples[0].key) {
-      pop_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
-      dst->samples[NUM_SAMPLES - 1] = src->samples[src_idx];
-      push_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
-    }
-    ++src_idx;
-  }
-  dst->source_size += src->source_size;
+    const StringVal& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK(!src.is_null);
+  ReservoirSampleState<T>* src_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+  ReservoirSampleState<T>* dst_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+  dst_state->Merge(ctx, src_state);
 }
 
 template <typename T>
@@ -1112,64 +1277,56 @@ void PrintSample(const ReservoirSample<TimestampVal>& v,
ostream* os) {
 
 template <typename T>
 StringVal AggregateFunctions::ReservoirSampleFinalize(FunctionContext* ctx,
-    const StringVal& src_val) {
-  if (UNLIKELY(src_val.is_null)) return src_val;
-  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
-  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
+    const StringVal& src) {
+  if (UNLIKELY(src.is_null)) return src;
+  ReservoirSampleState<T>* src_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
 
   stringstream out;
-  for (int i = 0; i < src->num_samples; ++i) {
-    PrintSample<T>(src->samples[i], &out);
-    if (i < (src->num_samples - 1)) out << ", ";
+  for (int i = 0; i < src_state->num_samples(); ++i) {
+    PrintSample<T>(*src_state->GetSample(i), &out);
+    if (i < (src_state->num_samples() - 1)) out << ", ";
   }
   const string& out_str = out.str();
   StringVal result_str(ctx, out_str.size());
   if (LIKELY(!result_str.is_null)) {
     memcpy(result_str.ptr, out_str.c_str(), result_str.len);
   }
-  ctx->Free(src_val.ptr);
+  src_state->Delete(ctx);
   return result_str;
 }
 
 template <typename T>
 StringVal AggregateFunctions::HistogramFinalize(FunctionContext* ctx,
-    const StringVal& src_val) {
-  if (UNLIKELY(src_val.is_null)) return src_val;
-  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
+    const StringVal& src) {
+  if (UNLIKELY(src.is_null)) return src;
 
-  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
-  sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
+  ReservoirSampleState<T>* src_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+  src_state->SortSamples();
 
   stringstream out;
-  int num_buckets = min(src->num_samples, NUM_BUCKETS);
-  int samples_per_bucket = max(src->num_samples / NUM_BUCKETS, 1);
+  int num_buckets = min(src_state->num_samples(), NUM_BUCKETS);
+  int samples_per_bucket = max(src_state->num_samples() / NUM_BUCKETS, 1);
   for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) {
     int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1;
-    PrintSample<T>(src->samples[sample_idx], &out);
+    PrintSample<T>(*(src_state->GetSample(sample_idx)), &out);
     if (bucket_idx < (num_buckets - 1)) out << ", ";
   }
   const string& out_str = out.str();
   StringVal result_str = StringVal::CopyFrom(ctx,
       reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size());
-  ctx->Free(src_val.ptr);
+  src_state->Delete(ctx);
   return result_str;
 }
 
 template <typename T>
-T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx,
-    const StringVal& src_val) {
-  if (UNLIKELY(src_val.is_null)) return T::null();
-  DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
-
-  ReservoirSampleState<T>* src = reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
-  if (src->num_samples == 0) {
-    ctx->Free(src_val.ptr);
-    return T::null();
-  }
-  sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
-
-  T result = src->samples[src->num_samples / 2].GetValue(ctx);
-  ctx->Free(src_val.ptr);
+T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const StringVal& src)
{
+  if (UNLIKELY(src.is_null)) return T::null();
+  ReservoirSampleState<T>* src_state =
+      reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+  T result = src_state->GetMedian(ctx);
+  src_state->Delete(ctx);
   return result;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index 675c75f..524c63b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1160,6 +1160,8 @@ select histogram(timestamp_col) from functional.alltypestiny;
 STRING
 ====
 ---- QUERY
+# IMPALA-4787: appx_median() on a medium sized dataset. This should excercise merge() with
+# differently sized inputs in the Reservoir Sampling algorithm.
 select
 appx_median(bool_col),
 appx_median(tinyint_col),
@@ -1169,13 +1171,24 @@ appx_median(float_col),
 appx_median(double_col),
 appx_median(string_col),
 appx_median(timestamp_col)
-from alltypestiny
+from alltypes
 ---- RESULTS
-true,1,1,1,1.100000023841858,10.1,'1',2009-03-01 00:00:00
+true,5,5,5,5.5,50.5,'5',2010-01-01 00:00:00
 ---- TYPES
 BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE, STRING, TIMESTAMP
 ====
 ---- QUERY
+# IMPALA-4787: appx_median on a large dataset. This requires several buffer resizes in the
+# Reservoir Sampling algorithm.
+select appx_median(l_returnflag)
+from tpch.lineitem
+where l_returnflag = "N"
+---- RESULTS
+'N'
+---- TYPES
+STRING
+====
+---- QUERY
 # IMPALA-1419: Agg fn containing arithmetic expr on NULL fails
 select count(null * 1) from functional.alltypes
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
index 338f8bd..eb4646c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
@@ -33,7 +33,7 @@ FunctionContext::Allocate() failed to allocate 16 bytes.
 ---- QUERY
 select sample(timestamp_col) from functional.alltypes
 ---- CATCH
-FunctionContext::Allocate() failed to allocate 480232 bytes.
+FunctionContext::Allocate() failed to allocate 248 bytes.
 ====
 ---- QUERY
 select distinctpc(int_col) from functional.alltypes
@@ -93,5 +93,5 @@ FunctionContextImpl::AllocateLocal() failed to allocate 120 bytes.
 ---- QUERY
 select appx_median(int_col) from functional.alltypes
 ---- CATCH
-FunctionContext::Allocate() failed to allocate 320232 bytes.
+FunctionContext::Allocate() failed to allocate 248 bytes.
 ====


Mime
View raw message