Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F33B1200B71 for ; Wed, 17 Aug 2016 00:42:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F1D93160ABF; Tue, 16 Aug 2016 22:42:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4C726160ABC for ; Wed, 17 Aug 2016 00:42:06 +0200 (CEST) Received: (qmail 98277 invoked by uid 500); 16 Aug 2016 22:42:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 97579 invoked by uid 99); 16 Aug 2016 22:42:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2016 22:42:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84F07EF423; Tue, 16 Aug 2016 22:42:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mshuler@apache.org To: commits@cassandra.apache.org Date: Tue, 16 Aug 2016 22:42:10 -0000 Message-Id: <02bd1c748d4f4b8b84e37ca9b3b55bcf@git.apache.org> In-Reply-To: <6f63b6836b6c4f2c9c7a6f6368b33337@git.apache.org> References: <6f63b6836b6c4f2c9c7a6f6368b33337@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] cassandra git commit: Add decay to histograms and timers used for metrics archived-at: Tue, 16 Aug 2016 22:42:08 -0000 Add decay to histograms and timers used for metrics Patch by Per Otterstrom; reviewed by tjake for CASSANDRA-11752 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e902596 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e902596 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e902596 Branch: refs/heads/cassandra-3.8 Commit: 2e90259669bddf04b6c1dba38b604aa6a33dcd47 Parents: 76e3100 Author: Per Otterstrom Authored: Thu Jun 23 01:01:39 2016 +0200 Committer: T Jake Luciani Committed: Wed Aug 10 14:37:45 2016 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../metrics/CassandraMetricsRegistry.java | 4 +- .../cassandra/metrics/ClearableHistogram.java | 4 +- .../DecayingEstimatedHistogramReservoir.java | 549 +++++++++++++++++++ .../metrics/EstimatedHistogramReservoir.java | 111 ---- .../cassandra/utils/EstimatedHistogram.java | 2 +- ...DecayingEstimatedHistogramReservoirTest.java | 381 +++++++++++++ 7 files changed, 936 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 232203e..05059cc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.8 + * Add decay to histograms and timers used for metrics (CASSANDRA-11752) * Fix hanging stream session (CASSANDRA-10992) * Add byteman support for testing (CASSANDRA-12377) * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java index 6fdb2ff..8e5671b 100644 --- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java +++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java @@ -60,7 +60,7 @@ public class CassandraMetricsRegistry extends MetricRegistry public Histogram histogram(MetricName name, boolean considerZeroes) { - Histogram histogram = register(name, new ClearableHistogram(new EstimatedHistogramReservoir(considerZeroes))); + Histogram histogram = register(name, new ClearableHistogram(new DecayingEstimatedHistogramReservoir(considerZeroes))); registerMBean(histogram, name.getMBeanName()); return histogram; @@ -68,7 +68,7 @@ public class CassandraMetricsRegistry extends MetricRegistry public Timer timer(MetricName name) { - Timer timer = register(name, new Timer(new EstimatedHistogramReservoir(false))); + Timer timer = register(name, new Timer(new DecayingEstimatedHistogramReservoir())); registerMBean(timer, name.getMBeanName()); return timer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/ClearableHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java index 85f2fa9..4a081d8 100644 --- a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java +++ b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java @@ -26,14 +26,14 @@ import com.codahale.metrics.Histogram; */ public class ClearableHistogram extends Histogram { - private final EstimatedHistogramReservoir reservoirRef; + private final DecayingEstimatedHistogramReservoir reservoirRef; /** * Creates a new {@link com.codahale.metrics.Histogram} with the given reservoir. * * @param reservoir the reservoir to create a histogram from */ - public ClearableHistogram(EstimatedHistogramReservoir reservoir) + public ClearableHistogram(DecayingEstimatedHistogramReservoir reservoir) { super(reservoir); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java new file mode 100644 index 0000000..14a4366 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; +import org.apache.cassandra.utils.EstimatedHistogram; + +/** + * A decaying histogram reservoir where values collected during each minute will be twice as significant as the values + * collected in the previous minute. Measured values are collected in variable sized buckets, using small buckets in the + * lower range and larger buckets in the upper range. Use this histogram when you want to know if the distribution of + * the underlying data stream has changed recently and you want high resolution on values in the lower range. + * + * The histogram use forward decay [1] to make recent values more significant. The forward decay factor will be doubled + * every minute (half-life time set to 60 seconds) [2]. The forward decay landmark is reset every 30 minutes (or at + * first read/update after 30 minutes). During landmark reset, updates and reads in the reservoir will be blocked in a + * fashion similar to the one used in the metrics library [3]. The 30 minute rescale interval is used based on the + * assumption that in an extreme case we would have to collect a metric 1M times for a single bucket each second. By the + * end of the 30:th minute all collected values will roughly add up to 1.000.000 * 60 * pow(2, 30) which can be + * represented with 56 bits giving us some head room in a signed 64 bit long. + * + * Internally two reservoirs are maintained, one with decay and one without decay. All public getters in a {@Snapshot} + * will expose the decay functionality with the exception of the {@link Snapshot#getValues()} which will return values + * from the reservoir without decay. This makes it possible for the caller to maintain precise deltas in an interval of + * its choise. + * + * The bucket size starts at 1 and grows by 1.2 each time (rounding and removing duplicates). It goes from 1 to around + * 18T by default (creating 164+1 buckets), which will give a timing resolution from microseconds to roughly 210 days, + * with less precision as the numbers get larger. + * + * The series of values to which the counts in `decayingBuckets` correspond: + * 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 14, 17, 20, 24, 29, 35, 42, 50, 60, 72 etc. + * Thus, a `decayingBuckets` of [0, 0, 1, 10] would mean we had seen 1 value of 3 and 10 values of 4. + * + * Each bucket represents values from (previous bucket offset, current offset]. + * + * [1]: http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf + * [2]: https://en.wikipedia.org/wiki/Half-life + * [3]: https://github.com/dropwizard/metrics/blob/v3.1.2/metrics-core/src/main/java/com/codahale/metrics/ExponentiallyDecayingReservoir.java + */ +public class DecayingEstimatedHistogramReservoir implements Reservoir +{ + /** + * The default number of decayingBuckets. Use this bucket count to reduce memory allocation for bucket offsets. + */ + public static final int DEFAULT_BUCKET_COUNT = 164; + public static final boolean DEFAULT_ZERO_CONSIDERATION = false; + + // The offsets used with a default sized bucket array without a separate bucket for zero values. + public static final long[] DEFAULT_WITHOUT_ZERO_BUCKET_OFFSETS = EstimatedHistogram.newOffsets(DEFAULT_BUCKET_COUNT, false); + + // The offsets used with a default sized bucket array with a separate bucket for zero values. + public static final long[] DEFAULT_WITH_ZERO_BUCKET_OFFSETS = EstimatedHistogram.newOffsets(DEFAULT_BUCKET_COUNT, true); + + // Represents the bucket offset as created by {@link EstimatedHistogram#newOffsets()} + private final long[] bucketOffsets; + + // decayingBuckets and buckets are one element longer than bucketOffsets -- the last element is values greater than the last offset + private final AtomicLongArray decayingBuckets; + private final AtomicLongArray buckets; + + public static final long HALF_TIME_IN_S = 60L; + public static final double MEAN_LIFETIME_IN_S = HALF_TIME_IN_S / Math.log(2.0); + public static final long LANDMARK_RESET_INTERVAL_IN_MS = 30L * 60L * 1000L; + + private final AtomicBoolean rescaling = new AtomicBoolean(false); + private volatile long decayLandmark; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + // Wrapper around System.nanoTime() to simplify unit testing. + private final Clock clock; + + + /** + * Construct a decaying histogram with default number of buckets and without considering zeroes. + */ + public DecayingEstimatedHistogramReservoir() + { + this(DEFAULT_ZERO_CONSIDERATION, DEFAULT_BUCKET_COUNT, Clock.defaultClock()); + } + + /** + * Construct a decaying histogram with default number of buckets. + * + * @param considerZeroes when true, 0-value measurements in a separate bucket, otherwise they will be collected in + * same bucket as 1-value measurements + */ + public DecayingEstimatedHistogramReservoir(boolean considerZeroes) + { + this(considerZeroes, DEFAULT_BUCKET_COUNT, Clock.defaultClock()); + } + + /** + * Construct a decaying histogram. + * + * @param considerZeroes when true, 0-value measurements in a separate bucket, otherwise they will be collected in + * same bucket as 1-value measurements + * @param bucketCount number of buckets used to collect measured values + */ + public DecayingEstimatedHistogramReservoir(boolean considerZeroes, int bucketCount) + { + this(considerZeroes, bucketCount, Clock.defaultClock()); + } + + @VisibleForTesting + DecayingEstimatedHistogramReservoir(boolean considerZeroes, int bucketCount, Clock clock) + { + if (bucketCount == DEFAULT_BUCKET_COUNT) + { + if (considerZeroes == true) + { + bucketOffsets = DEFAULT_WITH_ZERO_BUCKET_OFFSETS; + } + else + { + bucketOffsets = DEFAULT_WITHOUT_ZERO_BUCKET_OFFSETS; + } + } + else + { + bucketOffsets = EstimatedHistogram.newOffsets(bucketCount, considerZeroes); + } + decayingBuckets = new AtomicLongArray(bucketOffsets.length + 1); + buckets = new AtomicLongArray(bucketOffsets.length + 1); + this.clock = clock; + decayLandmark = clock.getTime(); + } + + /** + * Increments the count of the bucket closest to n, rounding UP. + * + * @param value the data point to add to the histogram + */ + public void update(long value) + { + long now = clock.getTime(); + rescaleIfNeeded(now); + + int index = Arrays.binarySearch(bucketOffsets, value); + if (index < 0) + { + // inexact match, take the first bucket higher than n + index = -index - 1; + } + // else exact match; we're good + + lockForRegularUsage(); + + try + { + decayingBuckets.getAndAdd(index, forwardDecayWeight(now)); + } + finally + { + unlockForRegularUsage(); + } + + buckets.getAndIncrement(index); + } + + private long forwardDecayWeight(long now) + { + return Math.round(Math.exp(((now - decayLandmark) / 1000L) / MEAN_LIFETIME_IN_S)); + } + + /** + * Return the number of buckets where recorded values are stored. + * + * This method does not return the number of recorded values as suggested by the {@link Reservoir} interface. + * + * @return the number of buckets + */ + public int size() + { + return decayingBuckets.length(); + } + + /** + * Returns a snapshot of the decaying values in this reservoir. + * + * Non-decaying reservoir will not be included in the snapshot. + * + * @return the snapshot + */ + public Snapshot getSnapshot() + { + rescaleIfNeeded(); + + lockForRegularUsage(); + + try + { + return new EstimatedHistogramReservoirSnapshot(this); + } + finally + { + unlockForRegularUsage(); + } + } + + /** + * @return true if this histogram has overflowed -- that is, a value larger than our largest bucket could bound was added + */ + @VisibleForTesting + boolean isOverflowed() + { + return decayingBuckets.get(decayingBuckets.length() - 1) > 0; + } + + private void rescaleIfNeeded() + { + rescaleIfNeeded(clock.getTime()); + } + + private void rescaleIfNeeded(long now) + { + if (needRescale(now)) + { + if (rescaling.compareAndSet(false, true)) + { + try + { + rescale(now); + } + finally + { + rescaling.set(false); + } + } + } + } + + private void rescale(long now) + { + // Check again to make sure that another thread didn't complete rescale already + if (needRescale(now)) + { + lockForRescale(); + + try + { + final long rescaleFactor = forwardDecayWeight(now); + decayLandmark = now; + + final int bucketCount = decayingBuckets.length(); + for (int i = 0; i < bucketCount; i++) + { + long newValue = Math.round((decayingBuckets.get(i) / rescaleFactor)); + decayingBuckets.set(i, newValue); + } + } + finally + { + unlockForRescale(); + } + } + } + + private boolean needRescale(long now) + { + return (now - decayLandmark) > LANDMARK_RESET_INTERVAL_IN_MS; + } + + @VisibleForTesting + public void clear() + { + lockForRescale(); + + try + { + final int bucketCount = decayingBuckets.length(); + for (int i = 0; i < bucketCount; i++) + { + decayingBuckets.set(i, 0L); + buckets.set(i, 0L); + } + } + finally + { + unlockForRescale(); + } + } + + private void lockForRegularUsage() + { + this.lock.readLock().lock(); + } + + private void unlockForRegularUsage() + { + this.lock.readLock().unlock(); + } + + private void lockForRescale() + { + this.lock.writeLock().lock(); + } + + private void unlockForRescale() + { + this.lock.writeLock().unlock(); + } + + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** + * Represents a snapshot of the decaying histogram. + * + * The decaying buckets are copied into a snapshot array to give a consistent view for all getters. However, the + * copy is made without a write-lock and so other threads may change the buckets while the array is copied, + * probably causign a slight skew up in the quantiles and mean values. + * + * The decaying buckets will be used for quantile calculations and mean values, but the non decaying buckets will be + * exposed for calls to {@link Snapshot#getValues()}. + */ + private class EstimatedHistogramReservoirSnapshot extends Snapshot + { + private final long[] decayingBuckets; + + public EstimatedHistogramReservoirSnapshot(DecayingEstimatedHistogramReservoir reservoir) + { + final int length = reservoir.decayingBuckets.length(); + + this.decayingBuckets = new long[length]; + + for (int i = 0; i < length; i++) + this.decayingBuckets[i] = reservoir.decayingBuckets.get(i); + } + + /** + * Get the estimated value at the specified quantile in the distribution. + * + * @param quantile the quantile specified as a value between 0.0 (zero) and 1.0 (one) + * @return estimated value at given quantile + * @throws IllegalStateException in case the histogram overflowed + */ + public double getValue(double quantile) + { + assert quantile >= 0 && quantile <= 1.0; + + final int lastBucket = decayingBuckets.length - 1; + + if (decayingBuckets[lastBucket] > 0) + throw new IllegalStateException("Unable to compute when histogram overflowed"); + + final long qcount = (long) Math.ceil(count() * quantile); + if (qcount == 0) + return 0; + + long elements = 0; + for (int i = 0; i < lastBucket; i++) + { + elements += decayingBuckets[i]; + if (elements >= qcount) + return bucketOffsets[i]; + } + return 0; + } + + /** + * Will return a snapshot of the non-decaying buckets. + * + * The values returned will not be consistent with the quantile and mean values. The caller must be aware of the + * offsets created by {@link EstimatedHistogram#getBucketOffsets()} to make use of the values returned. + * + * @return a snapshot of the non-decaying buckets. + */ + public long[] getValues() + { + final int length = buckets.length(); + + long[] values = new long[length]; + + for (int i = 0; i < length; i++) + values[i] = buckets.get(i); + + return values; + } + + /** + * Return the number of buckets where recorded values are stored. + * + * This method does not return the number of recorded values as suggested by the {@link Snapshot} interface. + * + * @return the number of buckets + */ + public int size() + { + return decayingBuckets.length; + } + + /** + * Return the number of registered values taking forward decay into account. + * + * @return the sum of all bucket values + */ + private long count() + { + long sum = 0L; + for (int i = 0; i < decayingBuckets.length; i++) + sum += decayingBuckets[i]; + return sum; + } + + /** + * Get the estimated max-value that could have been added to this reservoir. + * + * As values are collected in variable sized buckets, the actual max value recored in the reservoir may be less + * than the value returned. + * + * @return the largest value that could have been added to this reservoir, or Long.MAX_VALUE if the reservoir + * overflowed + */ + public long getMax() + { + final int lastBucket = decayingBuckets.length - 1; + + if (decayingBuckets[lastBucket] > 0) + return Long.MAX_VALUE; + + for (int i = lastBucket - 1; i >= 0; i--) + { + if (decayingBuckets[i] > 0) + return bucketOffsets[i]; + } + return 0; + } + + /** + * Get the estimated mean value in the distribution. + * + * @return the mean histogram value (average of bucket offsets, weighted by count) + * @throws IllegalStateException if any values were greater than the largest bucket threshold + */ + public double getMean() + { + final int lastBucket = decayingBuckets.length - 1; + + if (decayingBuckets[lastBucket] > 0) + throw new IllegalStateException("Unable to compute when histogram overflowed"); + + long elements = 0; + long sum = 0; + for (int i = 0; i < lastBucket; i++) + { + long bCount = decayingBuckets[i]; + elements += bCount; + sum += bCount * bucketOffsets[i]; + } + + return (double) sum / elements; + } + + /** + * Get the estimated min-value that could have been added to this reservoir. + * + * As values are collected in variable sized buckets, the actual min value recored in the reservoir may be + * higher than the value returned. + * + * @return the smallest value that could have been added to this reservoir + */ + public long getMin() + { + for (int i = 0; i < decayingBuckets.length; i++) + { + if (decayingBuckets[i] > 0) + return i == 0 ? 0 : 1 + bucketOffsets[i - 1]; + } + return 0; + } + + /** + * Get the estimated standard deviation of the values added to this reservoir. + * + * As values are collected in variable sized buckets, the actual deviation may be more or less than the value + * returned. + * + * @return an estimate of the standard deviation + */ + public double getStdDev() + { + final int lastBucket = decayingBuckets.length - 1; + + if (decayingBuckets[lastBucket] > 0) + throw new IllegalStateException("Unable to compute when histogram overflowed"); + + final long count = count(); + + if(count <= 1) { + return 0.0D; + } else { + double mean = this.getMean(); + double sum = 0.0D; + + for(int i = 0; i < lastBucket; ++i) { + long value = bucketOffsets[i]; + double diff = (double)value - mean; + sum += diff * diff * decayingBuckets[i]; + } + + return Math.sqrt(sum / (double)(count - 1)); + } + } + + public void dump(OutputStream output) + { + try (PrintWriter out = new PrintWriter(new OutputStreamWriter(output, UTF_8))) + { + int length = decayingBuckets.length; + + for(int i = 0; i < length; ++i) { + out.printf("%d%n", decayingBuckets[i]); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java b/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java deleted file mode 100644 index 29baad8..0000000 --- a/src/java/org/apache/cassandra/metrics/EstimatedHistogramReservoir.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.metrics; - -import com.google.common.annotations.VisibleForTesting; - -import com.codahale.metrics.Reservoir; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.UniformSnapshot; -import org.apache.cassandra.utils.EstimatedHistogram; - -/** - * Allows our Histogram implementation to be used by the metrics library. - * - * Default buckets allows nanosecond timings. - */ -public class EstimatedHistogramReservoir implements Reservoir -{ - EstimatedHistogram histogram; - - // Default to >4 hours of in nanoseconds of buckets - public EstimatedHistogramReservoir(boolean considerZeroes) - { - this(164, considerZeroes); - } - - public EstimatedHistogramReservoir(int numBuckets, boolean considerZeroes) - { - histogram = new EstimatedHistogram(numBuckets, considerZeroes); - } - - @Override - public int size() - { - return histogram.getBucketOffsets().length + 1; - } - - @Override - public void update(long value) - { - histogram.add(value); - } - - @Override - public Snapshot getSnapshot() - { - return new HistogramSnapshot(histogram); - } - - @VisibleForTesting - public void clear() - { - histogram.getBuckets(true); - } - - static class HistogramSnapshot extends UniformSnapshot - { - EstimatedHistogram histogram; - - public HistogramSnapshot(EstimatedHistogram histogram) - { - super(histogram.getBuckets(false)); - - this.histogram = histogram; - } - - @Override - public double getValue(double quantile) - { - return histogram.percentile(quantile); - } - - @Override - public long getMax() - { - return histogram.max(); - } - - @Override - public long getMin() - { - return histogram.min(); - } - - @Override - public double getMean() - { - return histogram.rawMean(); - } - - @Override - public long[] getValues() { - return histogram.getBuckets(false); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/src/java/org/apache/cassandra/utils/EstimatedHistogram.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java index 36048fb..1a48039 100644 --- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java +++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java @@ -85,7 +85,7 @@ public class EstimatedHistogram buckets = new AtomicLongArray(bucketData); } - private static long[] newOffsets(int size, boolean considerZeroes) + public static long[] newOffsets(int size, boolean considerZeroes) { long[] result = new long[size + (considerZeroes ? 1 : 0)]; int i = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e902596/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java new file mode 100644 index 0000000..f2d817f --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoirTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import org.junit.Test; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Snapshot; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class DecayingEstimatedHistogramReservoirTest +{ + private static final double DOUBLE_ASSERT_DELTA = 0; + + @Test + public void testSimple() + { + { + // 0 and 1 map to the same, first bucket + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(); + histogram.update(0); + assertEquals(1, histogram.getSnapshot().getValues()[0]); + histogram.update(1); + assertEquals(2, histogram.getSnapshot().getValues()[0]); + } + { + // 0 and 1 map to different buckets + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT); + histogram.update(0); + assertEquals(1, histogram.getSnapshot().getValues()[0]); + histogram.update(1); + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(1, snapshot.getValues()[0]); + assertEquals(1, snapshot.getValues()[1]); + } + } + + @Test + public void testOverflow() + { + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, 1); + histogram.update(100); + assert histogram.isOverflowed(); + assertEquals(Long.MAX_VALUE, histogram.getSnapshot().getMax()); + } + + @Test + public void testMinMax() + { + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(); + histogram.update(16); + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(15, snapshot.getMin()); + assertEquals(17, snapshot.getMax()); + } + + @Test + public void testMean() + { + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + for (int i = 0; i < 40; i++) + histogram.update(0); + for (int i = 0; i < 20; i++) + histogram.update(1); + for (int i = 0; i < 10; i++) + histogram.update(2); + assertEquals(1.14D, histogram.getSnapshot().getMean(), 0.1D); + } + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + for (int i = 0; i < 40; i++) + histogram.update(0); + for (int i = 0; i < 20; i++) + histogram.update(1); + for (int i = 0; i < 10; i++) + histogram.update(2); + assertEquals(0.57D, histogram.getSnapshot().getMean(), 0.1D); + } + } + + @Test + public void testStdDev() + { + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + for (int i = 0; i < 20; i++) + histogram.update(10); + for (int i = 0; i < 40; i++) + histogram.update(20); + for (int i = 0; i < 20; i++) + histogram.update(30); + + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(20.0D, snapshot.getMean(), 2.0D); + assertEquals(7.07D, snapshot.getStdDev(), 2.0D); + } + } + + @Test + public void testFindingCorrectBuckets() + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, 90, clock); + histogram.update(23282687); + assertFalse(histogram.isOverflowed()); + assertEquals(1, histogram.getSnapshot().getValues()[89]); + + histogram.update(9); + assertEquals(1, histogram.getSnapshot().getValues()[8]); + + histogram.update(21); + histogram.update(22); + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(2, snapshot.getValues()[13]); + assertEquals(6277304.5D, snapshot.getMean(), DOUBLE_ASSERT_DELTA); + } + + @Test + public void testPercentile() + { + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + // percentile of empty histogram is 0 + assertEquals(0D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + + histogram.update(1); + // percentile of a histogram with one element should be that element + assertEquals(1D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + + histogram.update(10); + assertEquals(10D, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + } + + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + + histogram.update(1); + histogram.update(2); + histogram.update(3); + histogram.update(4); + histogram.update(5); + + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(0, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA); + assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA); + assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA); + assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA); + } + + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + + for (int i = 11; i <= 20; i++) + histogram.update(i); + + // Right now the histogram looks like: + // 10 12 14 17 20 + // 0 2 2 3 3 + // %: 0 20 40 70 100 + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(12, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA); + assertEquals(14, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA); + assertEquals(17, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA); + assertEquals(17, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA); + assertEquals(20, snapshot.getValue(0.80), DOUBLE_ASSERT_DELTA); + } + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(true, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + histogram.update(0); + histogram.update(0); + histogram.update(1); + + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA); + assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA); + } + } + + + @Test + public void testDecayingPercentile() + { + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + // percentile of empty histogram is 0 + assertEquals(0, histogram.getSnapshot().getValue(1.0), DOUBLE_ASSERT_DELTA); + + for (int v = 1; v <= 100; v++) + { + for (int i = 0; i < 10_000; i++) + { + histogram.update(v); + } + } + + Snapshot snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(05, snapshot.getValue(0.05)); + assertEstimatedQuantile(20, snapshot.getValue(0.20)); + assertEstimatedQuantile(40, snapshot.getValue(0.40)); + assertEstimatedQuantile(99, snapshot.getValue(0.99)); + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(05, snapshot.getValue(0.05)); + assertEstimatedQuantile(20, snapshot.getValue(0.20)); + assertEstimatedQuantile(40, snapshot.getValue(0.40)); + assertEstimatedQuantile(99, snapshot.getValue(0.99)); + + for (int v = 1; v <= 50; v++) + { + for (int i = 0; i < 10_000; i++) + { + histogram.update(v); + } + } + + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(04, snapshot.getValue(0.05)); + assertEstimatedQuantile(14, snapshot.getValue(0.20)); + assertEstimatedQuantile(27, snapshot.getValue(0.40)); + assertEstimatedQuantile(98, snapshot.getValue(0.99)); + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(04, snapshot.getValue(0.05)); + assertEstimatedQuantile(14, snapshot.getValue(0.20)); + assertEstimatedQuantile(27, snapshot.getValue(0.40)); + assertEstimatedQuantile(98, snapshot.getValue(0.99)); + + for (int v = 1; v <= 50; v++) + { + for (int i = 0; i < 10_000; i++) + { + histogram.update(v); + } + } + + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(03, snapshot.getValue(0.05)); + assertEstimatedQuantile(12, snapshot.getValue(0.20)); + assertEstimatedQuantile(23, snapshot.getValue(0.40)); + assertEstimatedQuantile(96, snapshot.getValue(0.99)); + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(03, snapshot.getValue(0.05)); + assertEstimatedQuantile(12, snapshot.getValue(0.20)); + assertEstimatedQuantile(23, snapshot.getValue(0.40)); + assertEstimatedQuantile(96, snapshot.getValue(0.99)); + + for (int v = 11; v <= 20; v++) + { + for (int i = 0; i < 5_000; i++) + { + histogram.update(v); + } + } + + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(04, snapshot.getValue(0.05)); + assertEstimatedQuantile(12, snapshot.getValue(0.20)); + assertEstimatedQuantile(20, snapshot.getValue(0.40)); + assertEstimatedQuantile(95, snapshot.getValue(0.99)); + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + snapshot = histogram.getSnapshot(); + assertEstimatedQuantile(04, snapshot.getValue(0.05)); + assertEstimatedQuantile(12, snapshot.getValue(0.20)); + assertEstimatedQuantile(20, snapshot.getValue(0.40)); + assertEstimatedQuantile(95, snapshot.getValue(0.99)); + + } + + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + // percentile of empty histogram is 0 + assertEquals(0, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + + for (int m = 0; m < 40; m++) + { + for (int i = 0; i < 1_000_000; i++) + { + histogram.update(2); + } + // percentile of a histogram with one element should be that element + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + assertEquals(2, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + } + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S * 100); + assertEquals(0, histogram.getSnapshot().getValue(0.99), DOUBLE_ASSERT_DELTA); + } + + { + TestClock clock = new TestClock(); + + DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir(DecayingEstimatedHistogramReservoir.DEFAULT_ZERO_CONSIDERATION, DecayingEstimatedHistogramReservoir.DEFAULT_BUCKET_COUNT, clock); + + histogram.update(20); + histogram.update(21); + histogram.update(22); + Snapshot snapshot = histogram.getSnapshot(); + assertEquals(1, snapshot.getValues()[12]); + assertEquals(2, snapshot.getValues()[13]); + + clock.addSeconds(DecayingEstimatedHistogramReservoir.HALF_TIME_IN_S); + + histogram.update(20); + histogram.update(21); + histogram.update(22); + snapshot = histogram.getSnapshot(); + assertEquals(2, snapshot.getValues()[12]); + assertEquals(4, snapshot.getValues()[13]); + } + } + + private void assertEstimatedQuantile(long expectedValue, double actualValue) + { + assertTrue("Expected at least [" + expectedValue + "] but actual is [" + actualValue + "]", actualValue >= expectedValue); + assertTrue("Expected less than [" + Math.round(expectedValue * 1.2) + "] but actual is [" + actualValue + "]", actualValue < Math.round(expectedValue * 1.2)); + } + + public class TestClock extends Clock { + private long tick = 0; + + public void addSeconds(long seconds) + { + tick += seconds * 1_000_000_000L; + } + + public long getTick() + { + return tick; + } + + public long getTime() + { + return tick / 1_000_000L; + }; + } +}