cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jji...@apache.org
Subject [2/6] cassandra git commit: Faster streaming histograms
Date Tue, 28 Feb 2017 01:26:26 GMT
Faster streaming histograms

In ttl-heavy use cases (especially tables with default time to live set), the
streaming histograms calculated during compaction and streaming are very inefficient.

This patch addresses that in two ways:
1) It creates a system property -Dcassandra.streaminghistogram.roundseconds=60,
and rounds all histograms to the next highest multiple of that value, and
2) Rather than maintaining a histogram of 100 bins that have to be merged
on every new value, we keep a temporary spool of 100000 bins, and merge
down to the 100 bin final histogram only when the temporary spool overflows.

Patch by Jeff Jirsa; Reviewed by Nate McCall for CASSANDRA-13038


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5ce9631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5ce9631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5ce9631

Branch: refs/heads/cassandra-3.11
Commit: a5ce963117acf5e4cf0a31057551f2f42385c398
Parents: ab71748
Author: Jeff Jirsa <jeff@jeffjirsa.net>
Authored: Mon Feb 13 19:51:58 2017 -0800
Committer: Jeff Jirsa <jeff@jeffjirsa.net>
Committed: Mon Feb 27 17:21:48 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/io/sstable/SSTable.java    |   3 +
 .../io/sstable/metadata/MetadataCollector.java  |   2 +-
 .../cassandra/utils/StreamingHistogram.java     | 109 +++--
 .../microbench/StreamingHistogramBench.java     | 405 +++++++++++++++++++
 .../db/compaction/CompactionsTest.java          |   3 +
 .../DateTieredCompactionStrategyTest.java       |   4 +
 .../LeveledCompactionStrategyTest.java          |   4 +
 .../SizeTieredCompactionStrategyTest.java       |   4 +
 .../cassandra/db/compaction/TTLExpiryTest.java  |   4 +
 .../TimeWindowCompactionStrategyTest.java       |   4 +
 .../cassandra/utils/StreamingHistogramTest.java |  39 +-
 12 files changed, 551 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 386029e..1100bfd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.12
+ * Faster StreamingHistogram (CASSANDRA-13038)
  * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
  * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
  * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 923ef82..1e4488c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -58,7 +58,10 @@ public abstract class SSTable
 {
     static final Logger logger = LoggerFactory.getLogger(SSTable.class);
 
+
     public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
+    public static final int TOMBSTONE_HISTOGRAM_SPOOL_SIZE = 100000;
+    public static final int TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS = Integer.valueOf(System.getProperty("cassandra.streaminghistogram.roundseconds",
"60"));
 
     public final Descriptor descriptor;
     protected final Set<Component> components;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 4a67623..3b13cf4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -61,7 +61,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
 
     static StreamingHistogram defaultTombstoneDropTimeHistogram()
     {
-        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE,
SSTable.TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS);
     }
 
     public static StatsMetadata defaultStatsMetadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index b925395..fffa73e 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -41,23 +41,38 @@ public class StreamingHistogram
     // TreeMap to hold bins of histogram.
     private final TreeMap<Double, Long> bin;
 
+    // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
+    private final TreeMap<Double, Long> spool;
+
     // maximum bin size for this histogram
     private final int maxBinSize;
 
+    // maximum size of the spool
+    private final int maxSpoolSize;
+
+    // voluntarily give up resolution for speed
+    private final int roundSeconds;
+
     /**
      * Creates a new histogram with max bin size of maxBinSize
      * @param maxBinSize maximum number of bins this histogram can have
      */
-    public StreamingHistogram(int maxBinSize)
+    public StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds)
     {
         this.maxBinSize = maxBinSize;
+        this.maxSpoolSize = maxSpoolSize;
+        this.roundSeconds = roundSeconds;
         bin = new TreeMap<>();
+        spool = new TreeMap<>();
     }
 
-    private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
+    private StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds, Map<Double,
Long> bin)
     {
         this.maxBinSize = maxBinSize;
+        this.maxSpoolSize = maxSpoolSize;
+        this.roundSeconds = roundSeconds;
         this.bin = new TreeMap<>(bin);
+        this.spool = new TreeMap<>();
     }
 
     /**
@@ -76,41 +91,77 @@ public class StreamingHistogram
      */
     public void update(double p, long m)
     {
-        Long mi = bin.get(p);
+        double d = p % this.roundSeconds;
+        if (d > 0)
+            p = p + (this.roundSeconds - d);
+
+        Long mi = spool.get(p);
         if (mi != null)
         {
             // we found the same p so increment that counter
-            bin.put(p, mi + m);
+            spool.put(p, mi + m);
         }
         else
         {
-            bin.put(p, m);
-            // if bin size exceeds maximum bin size then trim down to max size
-            while (bin.size() > maxBinSize)
+            spool.put(p, m);
+        }
+        if(spool.size() > maxSpoolSize)
+            flushHistogram();
+    }
+
+    /**
+     * Drain the temporary spool into the final bins
+     */
+    public void flushHistogram()
+    {
+        if(spool.size() > 0)
+        {
+            Long spoolValue;
+            Long binValue;
+
+            // Iterate over the spool, copying the value into the primary bin map
+            // and compacting that map as necessary
+            for (Map.Entry<Double, Long> entry : spool.entrySet())
             {
-                // find points p1, p2 which have smallest difference
-                Iterator<Double> keys = bin.keySet().iterator();
-                double p1 = keys.next();
-                double p2 = keys.next();
-                double smallestDiff = p2 - p1;
-                double q1 = p1, q2 = p2;
-                while (keys.hasNext())
+                Double key = entry.getKey();
+                spoolValue = entry.getValue();
+                binValue = bin.get(key);
+
+                if (binValue != null)
                 {
-                    p1 = p2;
-                    p2 = keys.next();
-                    double diff = p2 - p1;
-                    if (diff < smallestDiff)
+                    binValue += spoolValue;
+                    bin.put(key, binValue);
+                } else
                     {
-                        smallestDiff = diff;
-                        q1 = p1;
-                        q2 = p2;
+                    bin.put(key, spoolValue);
+                }
+
+                // if bin size exceeds maximum bin size then trim down to max size
+                if (bin.size() > maxBinSize)
+                {
+                    // find points p1, p2 which have smallest difference
+                    Iterator<Double> keys = bin.keySet().iterator();
+                    double p1 = keys.next();
+                    double p2 = keys.next();
+                    double smallestDiff = p2 - p1;
+                    double q1 = p1, q2 = p2;
+                    while (keys.hasNext()) {
+                        p1 = p2;
+                        p2 = keys.next();
+                        double diff = p2 - p1;
+                        if (diff < smallestDiff) {
+                            smallestDiff = diff;
+                            q1 = p1;
+                            q2 = p2;
+                        }
                     }
+                    // merge those two
+                    long k1 = bin.remove(q1);
+                    long k2 = bin.remove(q2);
+                    bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
                 }
-                // merge those two
-                long k1 = bin.remove(q1);
-                long k2 = bin.remove(q2);
-                bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
             }
+            spool.clear();
         }
     }
 
@@ -124,6 +175,8 @@ public class StreamingHistogram
         if (other == null)
             return;
 
+        flushHistogram();
+
         for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet())
             update(entry.getKey(), entry.getValue());
     }
@@ -136,6 +189,7 @@ public class StreamingHistogram
      */
     public double sum(double b)
     {
+        flushHistogram();
         double sum = 0;
         // find the points pi, pnext which satisfy pi <= b < pnext
         Map.Entry<Double, Long> pnext = bin.higherEntry(b);
@@ -165,6 +219,7 @@ public class StreamingHistogram
 
     public Map<Double, Long> getAsMap()
     {
+        flushHistogram();
         return Collections.unmodifiableMap(bin);
     }
 
@@ -192,7 +247,7 @@ public class StreamingHistogram
                 tmp.put(in.readDouble(), in.readLong());
             }
 
-            return new StreamingHistogram(maxBinSize, tmp);
+            return new StreamingHistogram(maxBinSize, maxBinSize, 1, tmp);
         }
 
         public long serializedSize(StreamingHistogram histogram)
@@ -222,7 +277,7 @@ public class StreamingHistogram
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(bin.hashCode(), maxBinSize);
+        return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
b/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
new file mode 100644
index 0000000..23e8f4e
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.Random;
+
+import org.apache.cassandra.utils.StreamingHistogram;
+import org.openjdk.jmh.annotations.*;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+@State(Scope.Benchmark)
+public class StreamingHistogramBench
+{
+
+    StreamingHistogram streamingHistogram;
+    StreamingHistogram newStreamingHistogram;
+    StreamingHistogram newStreamingHistogram2;
+    StreamingHistogram newStreamingHistogram3;
+    StreamingHistogram newStreamingHistogram4;
+    StreamingHistogram newStreamingHistogram5;
+    StreamingHistogram newStreamingHistogram6;
+    StreamingHistogram streamingHistogram60;
+    StreamingHistogram newStreamingHistogram60;
+    StreamingHistogram newStreamingHistogram100x60;
+
+    StreamingHistogram narrowstreamingHistogram;
+    StreamingHistogram narrownewStreamingHistogram;
+    StreamingHistogram narrownewStreamingHistogram2;
+    StreamingHistogram narrownewStreamingHistogram3;
+    StreamingHistogram narrownewStreamingHistogram4;
+    StreamingHistogram narrownewStreamingHistogram5;
+    StreamingHistogram narrownewStreamingHistogram6;
+    StreamingHistogram narrownewStreamingHistogram60;
+    StreamingHistogram narrowstreamingHistogram60;
+    StreamingHistogram narrownewStreamingHistogram100x60;
+
+    StreamingHistogram sparsestreamingHistogram;
+    StreamingHistogram sparsenewStreamingHistogram;
+    StreamingHistogram sparsenewStreamingHistogram2;
+    StreamingHistogram sparsenewStreamingHistogram3;
+    StreamingHistogram sparsenewStreamingHistogram4;
+    StreamingHistogram sparsenewStreamingHistogram5;
+    StreamingHistogram sparsenewStreamingHistogram6;
+    StreamingHistogram sparsestreamingHistogram60;
+    StreamingHistogram sparsenewStreamingHistogram60;
+    StreamingHistogram sparsenewStreamingHistogram100x60;
+
+    static int[] ttls = new int[10000000];
+    static int[] narrowttls = new int[10000000];
+    static int[] sparsettls = new int[10000000];
+    static
+    {
+        Random random = new Random();
+        for(int i = 0 ; i < 10000000; i++)
+        {
+            // Seconds in a day
+            ttls[i] = random.nextInt(86400);
+            // Seconds in 3 hours
+            narrowttls[i] = random.nextInt(14400);
+            // Seconds in a minute
+            sparsettls[i] = random.nextInt(60);
+        }
+    }
+
+    @Setup(Level.Trial)
+    public void setup() throws Throwable
+    {
+
+        streamingHistogram = new StreamingHistogram(100, 0, 1);
+        newStreamingHistogram = new StreamingHistogram(100, 1000, 1);
+        newStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
+        newStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
+        newStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
+        newStreamingHistogram5 = new StreamingHistogram(50, 10000,1 );
+        newStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
+        streamingHistogram60 = new StreamingHistogram(100, 0, 60);
+        newStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
+        newStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
+
+        narrowstreamingHistogram = new StreamingHistogram(100, 0, 1);
+        narrownewStreamingHistogram = new StreamingHistogram(100, 1000, 1);
+        narrownewStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
+        narrownewStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
+        narrownewStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
+        narrownewStreamingHistogram5 = new StreamingHistogram(50, 10000, 1);
+        narrownewStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
+        narrowstreamingHistogram60 = new StreamingHistogram(100, 0, 60);
+        narrownewStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
+        narrownewStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
+
+
+        sparsestreamingHistogram = new StreamingHistogram(100, 0, 1);
+        sparsenewStreamingHistogram = new StreamingHistogram(100, 1000, 1);
+        sparsenewStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
+        sparsenewStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
+        sparsenewStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
+        sparsenewStreamingHistogram5 = new StreamingHistogram(50, 10000, 1);
+        sparsenewStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
+        sparsestreamingHistogram60 = new StreamingHistogram(100, 0, 60);
+        sparsenewStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
+        sparsenewStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
+
+    }
+
+    @TearDown(Level.Trial)
+    public void teardown() throws IOException, ExecutionException, InterruptedException
+    {
+
+    }
+
+    @Benchmark
+    public void existingSH() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            streamingHistogram.update(ttls[i]);
+        streamingHistogram.flushHistogram();
+    }
+
+    @Benchmark
+    public void newSH10x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram.update(ttls[i]);
+        newStreamingHistogram.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void newSH100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram2.update(ttls[i]);
+        newStreamingHistogram2.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void newSH1000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram3.update(ttls[i]);
+        newStreamingHistogram3.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void newSH10000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram6.update(ttls[i]);
+        newStreamingHistogram6.flushHistogram();
+
+    }
+
+
+    @Benchmark
+    public void newSH50and1000() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram4.update(ttls[i]);
+        newStreamingHistogram4.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void newSH50and100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram5.update(ttls[i]);
+        newStreamingHistogram5.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void streaminghistogram60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            streamingHistogram60.update(sparsettls[i]);
+        streamingHistogram60.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void newstreaminghistogram1000x60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram60.update(sparsettls[i]);
+        newStreamingHistogram60.flushHistogram();
+    }
+
+    @Benchmark
+    public void newstreaminghistogram100x60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            newStreamingHistogram100x60.update(sparsettls[i]);
+        newStreamingHistogram100x60.flushHistogram();
+    }
+
+
+    @Benchmark
+    public void narrowexistingSH() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrowstreamingHistogram.update(narrowttls[i]);
+        narrowstreamingHistogram.flushHistogram();
+    }
+
+    @Benchmark
+    public void narrownewSH10x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram.update(narrowttls[i]);
+        narrownewStreamingHistogram.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewSH100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram2.update(narrowttls[i]);
+        narrownewStreamingHistogram2.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewSH1000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram3.update(narrowttls[i]);
+        narrownewStreamingHistogram3.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewSH10000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram6.update(ttls[i]);
+        narrownewStreamingHistogram6.flushHistogram();
+
+    }
+
+
+    @Benchmark
+    public void narrownewSH50and1000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram4.update(narrowttls[i]);
+        narrownewStreamingHistogram4.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewSH50and100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram5.update(narrowttls[i]);
+        narrownewStreamingHistogram5.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrowstreaminghistogram60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrowstreamingHistogram60.update(sparsettls[i]);
+        narrowstreamingHistogram60.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewstreaminghistogram1000x60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram60.update(sparsettls[i]);
+        narrownewStreamingHistogram60.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void narrownewstreaminghistogram100x60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            narrownewStreamingHistogram100x60.update(sparsettls[i]);
+        narrownewStreamingHistogram100x60.flushHistogram();
+
+    }
+
+
+    @Benchmark
+    public void sparseexistingSH() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsestreamingHistogram.update(sparsettls[i]);
+        sparsestreamingHistogram.flushHistogram();
+    }
+
+    @Benchmark
+    public void sparsenewSH10x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram.update(sparsettls[i]);
+        sparsenewStreamingHistogram.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewSH100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram2.update(sparsettls[i]);
+        sparsenewStreamingHistogram2.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewSH1000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram3.update(sparsettls[i]);
+        sparsenewStreamingHistogram3.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewSH10000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram6.update(ttls[i]);
+        sparsenewStreamingHistogram6.flushHistogram();
+    }
+
+
+    @Benchmark
+    public void sparsenewSH50and1000x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram4.update(sparsettls[i]);
+        sparsenewStreamingHistogram4.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewSH50and100x() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram5.update(sparsettls[i]);
+        sparsenewStreamingHistogram5.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsestreaminghistogram60s() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsestreamingHistogram60.update(sparsettls[i]);
+        sparsestreamingHistogram60.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewstreaminghistogram1000x60() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram60.update(sparsettls[i]);
+        sparsenewStreamingHistogram60.flushHistogram();
+
+    }
+
+    @Benchmark
+    public void sparsenewstreaminghistogram100x60() throws Throwable
+    {
+        for(int i = 0 ; i < ttls.length; i++)
+            sparsenewStreamingHistogram100x60.update(sparsettls[i]);
+        sparsenewStreamingHistogram100x60.flushHistogram();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 0ce81d3..2a30ae1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -62,6 +62,9 @@ public class CompactionsTest
         compactionOptions.put("tombstone_compaction_interval", "1");
         SchemaLoader.prepareServer();
 
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.denseCFMD(KEYSPACE1, CF_DENSE1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index 5f470b7..8920d46 100644
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -54,6 +54,10 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                 KeyspaceParams.simple(1),
                 SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 636afe1..fc88987 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -74,6 +74,10 @@ public class LeveledCompactionStrategyTest
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARDDLEVELED)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index f4dbea8..06bce12 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -54,6 +54,10 @@ public class SizeTieredCompactionStrategyTest
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index b264553..e998ef0 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -58,6 +58,10 @@ public class TTLExpiryTest
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     CFMetaData.Builder.create(KEYSPACE1, CF_STANDARD1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index 3238170..0dee7e9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -62,6 +62,10 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
+
+        // Disable tombstone histogram rounding for tests
+        System.setProperty("cassandra.streaminghistogram.roundseconds", "1");
+
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5ce9631/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index b6b1882..21c736e 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -32,7 +32,7 @@ public class StreamingHistogramTest
     @Test
     public void testFunction() throws Exception
     {
-        StreamingHistogram hist = new StreamingHistogram(5);
+        StreamingHistogram hist = new StreamingHistogram(5, 5, 1);
         long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
 
         // add 7 points to histogram of 5 bins
@@ -58,7 +58,7 @@ public class StreamingHistogramTest
         }
 
         // merge test
-        StreamingHistogram hist2 = new StreamingHistogram(3);
+        StreamingHistogram hist2 = new StreamingHistogram(3, 0, 1);
         for (int i = 7; i < samples.length; i++)
         {
             hist2.update(samples[i]);
@@ -88,7 +88,7 @@ public class StreamingHistogramTest
     @Test
     public void testSerDe() throws Exception
     {
-        StreamingHistogram hist = new StreamingHistogram(5);
+        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
         long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9};
 
         // add 7 points to histogram of 5 bins
@@ -119,4 +119,37 @@ public class StreamingHistogramTest
             assertEquals(entry.getValue(), actual.getValue());
         }
     }
+
+    @Test
+    public void testOverflow() throws Exception
+    {
+        StreamingHistogram hist = new StreamingHistogram(5, 10, 1);
+        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31,
+                32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
+                32, 32, 33, 34, 35, 70, 78, 80, 90, 100
+        };
+
+        // Hit the spool cap, force it to make bins
+        for (int i = 0; i < samples.length; i++)
+        {
+            hist.update(samples[i]);
+        }
+        assertEquals(5, hist.getAsMap().keySet().size());
+
+    }
+
+    @Test
+    public void testRounding() throws Exception
+    {
+        StreamingHistogram hist = new StreamingHistogram(5, 10, 60);
+        long[] samples = new long[] { 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180,
240, 300
+        for (int i = 0 ; i < samples.length ; i++)
+            hist.update(samples[i]);
+
+        assertEquals(hist.getAsMap().keySet().size(), (int) 5);
+        assertEquals((long) hist.getAsMap().get(Double.valueOf(60)), (long) 2L);
+        assertEquals((long) hist.getAsMap().get(Double.valueOf(120)), (long) 1L);
+
+    }
+
 }


Mime
View raw message