cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject cassandra git commit: Fast and garbage-free Streaming Histogram
Date Thu, 01 Jun 2017 10:19:27 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 96899bbb6 -> 06da35fdd


Fast and garbage-free Streaming Histogram

patch by Fedor Bobin; reviewed by jasobrown for CASSANDRA-13444


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06da35fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06da35fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06da35fd

Branch: refs/heads/trunk
Commit: 06da35fdda32781e620d5d28c514ee480212a108
Parents: 96899bb
Author: Fedor Bobin <fuudtorrentsru@gmail.com>
Authored: Wed Apr 12 08:10:03 2017 +0300
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Thu Jun 1 03:17:23 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/sstable/metadata/MetadataCollector.java  |  17 +-
 .../io/sstable/metadata/StatsMetadata.java      |  12 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |   7 +-
 .../cassandra/utils/StreamingHistogram.java     | 310 ----------
 .../utils/streamhist/HistogramDataConsumer.java |  27 +
 .../StreamingTombstoneHistogramBuilder.java     | 606 +++++++++++++++++++
 .../utils/streamhist/TombstoneHistogram.java    | 130 ++++
 .../microbench/StreamingHistogramBench.java     | 405 -------------
 ...StreamingTombstoneHistogramBuilderBench.java | 113 ++++
 .../cassandra/utils/StreamingHistogramTest.java | 184 ------
 .../StreamingTombstoneHistogramBuilderTest.java | 176 ++++++
 12 files changed, 1068 insertions(+), 920 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95464cc..48ca381 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
  * Update repairTime for keyspaces on completion (CASSANDRA-13539)
  * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
  * Bring back maxHintTTL propery (CASSANDRA-12982)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
old mode 100644
new mode 100755
index 86a494f..9d9c1a8
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -39,7 +39,8 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;
-import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
+import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder;
 
 public class MetadataCollector implements PartitionStatisticsCollector
 {
@@ -57,9 +58,9 @@ public class MetadataCollector implements PartitionStatisticsCollector
         return new EstimatedHistogram(150);
     }
 
-    static StreamingHistogram defaultTombstoneDropTimeHistogram()
+    static TombstoneHistogram defaultTombstoneDropTimeHistogram()
     {
-        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE, SSTable.TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS);
+        return TombstoneHistogram.createDefault();
     }
 
     public static StatsMetadata defaultStatsMetadata()
@@ -93,7 +94,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
     protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
     protected double compressionRatio = NO_COMPRESSION_RATIO;
-    protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
+    protected StreamingTombstoneHistogramBuilder estimatedTombstoneDropTime = new StreamingTombstoneHistogramBuilder(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE, SSTable.TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS);
     protected int sstableLevel;
     protected ByteBuffer[] minClusteringValues;
     protected ByteBuffer[] maxClusteringValues;
@@ -151,12 +152,6 @@ public class MetadataCollector implements PartitionStatisticsCollector
         return this;
     }
 
-    public MetadataCollector mergeTombstoneHistogram(StreamingHistogram histogram)
-    {
-        estimatedTombstoneDropTime.merge(histogram);
-        return this;
-    }
-
     /**
      * Ratio is compressed/uncompressed and it is
      * if you have 1.x then compression isn't helping
@@ -291,7 +286,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
                                                              ttlTracker.min(),
                                                              ttlTracker.max(),
                                                              compressionRatio,
-                                                             estimatedTombstoneDropTime,
+                                                             estimatedTombstoneDropTime.build(),
                                                              sstableLevel,
                                                              makeList(minClusteringValues),
                                                              makeList(maxClusteringValues),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
old mode 100644
new mode 100755
index fe5d7bb..2b8ebef
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -55,7 +55,7 @@ public class StatsMetadata extends MetadataComponent
     public final int minTTL;
     public final int maxTTL;
     public final double compressionRatio;
-    public final StreamingHistogram estimatedTombstoneDropTime;
+    public final TombstoneHistogram estimatedTombstoneDropTime;
     public final int sstableLevel;
     public final List<ByteBuffer> minClusteringValues;
     public final List<ByteBuffer> maxClusteringValues;
@@ -75,7 +75,7 @@ public class StatsMetadata extends MetadataComponent
                          int minTTL,
                          int maxTTL,
                          double compressionRatio,
-                         StreamingHistogram estimatedTombstoneDropTime,
+                         TombstoneHistogram estimatedTombstoneDropTime,
                          int sstableLevel,
                          List<ByteBuffer> minClusteringValues,
                          List<ByteBuffer> maxClusteringValues,
@@ -269,7 +269,7 @@ public class StatsMetadata extends MetadataComponent
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
             size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(CommitLogPosition.NONE));
             size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
-            size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime);
+            size += TombstoneHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime);
             size += TypeSizes.sizeof(component.sstableLevel);
             // min column names
             size += 4;
@@ -307,7 +307,7 @@ public class StatsMetadata extends MetadataComponent
             out.writeInt(component.minTTL);
             out.writeInt(component.maxTTL);
             out.writeDouble(component.compressionRatio);
-            StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
+            TombstoneHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
             out.writeInt(component.sstableLevel);
             out.writeLong(component.repairedAt);
             out.writeInt(component.minClusteringValues.size());
@@ -353,7 +353,7 @@ public class StatsMetadata extends MetadataComponent
             int minTTL = in.readInt();
             int maxTTL = in.readInt();
             double compressionRatio = in.readDouble();
-            StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
+            TombstoneHistogram tombstoneHistogram = TombstoneHistogram.serializer.deserialize(in);
             int sstableLevel = in.readInt();
             long repairedAt = in.readLong();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
old mode 100644
new mode 100755
index 65e6ece..d240465
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -142,10 +142,9 @@ public class SSTableMetadataViewer
                     out.printf("totalRows: %s%n", stats.totalRows);
                     out.println("Estimated tombstone drop times:");
 
-                    for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
-                    {
-                        out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]);
-                    }
+                    stats.estimatedTombstoneDropTime.forEach((point, value) -> {
+                        out.printf("%-10s:%10s%n", point, value);
+                    });
                     printHistograms(stats, out);
                 }
                 if (compaction != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
deleted file mode 100644
index 9114c7d..0000000
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.io.IOException;
-import java.util.*;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-/**
- * Histogram that can be constructed from streaming of data.
- *
- * The algorithm is taken from following paper:
- * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
- * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
- */
-public class StreamingHistogram
-{
-    public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer();
-
-    // TreeMap to hold bins of histogram.
-    // The key is a numeric type so we can avoid boxing/unboxing streams of different key types
-    // The value is a unboxed long array always of length == 1
-    // Serialized Histograms always writes with double keys for backwards compatibility
-    private final TreeMap<Number, long[]> bin;
-
-    // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
-    private final TreeMap<Number, long[]> spool;
-
-    // maximum bin size for this histogram
-    private final int maxBinSize;
-
-    // maximum size of the spool
-    private final int maxSpoolSize;
-
-    // voluntarily give up resolution for speed
-    private final int roundSeconds;
-
-    /**
-     * Creates a new histogram with max bin size of maxBinSize
-     * @param maxBinSize maximum number of bins this histogram can have
-     */
-    public StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds)
-    {
-        this.maxBinSize = maxBinSize;
-        this.maxSpoolSize = maxSpoolSize;
-        this.roundSeconds = roundSeconds;
-        bin = new TreeMap<>((o1, o2) -> {
-            if (o1.getClass().equals(o2.getClass()))
-                return ((Comparable)o1).compareTo(o2);
-            else
-            	return Double.compare(o1.doubleValue(), o2.doubleValue());
-        });
-        spool = new TreeMap<>((o1, o2) -> {
-            if (o1.getClass().equals(o2.getClass()))
-                return ((Comparable)o1).compareTo(o2);
-            else
-                return Double.compare(o1.doubleValue(), o2.doubleValue());
-        });
-
-    }
-
-    private StreamingHistogram(int maxBinSize, int maxSpoolSize,  int roundSeconds, Map<Double, Long> bin)
-    {
-        this(maxBinSize, maxSpoolSize, roundSeconds);
-        for (Map.Entry<Double, Long> entry : bin.entrySet())
-            this.bin.put(entry.getKey(), new long[]{entry.getValue()});
-    }
-
-    /**
-     * Adds new point p to this histogram.
-     * @param p
-     */
-    public void update(Number p)
-    {
-        update(p, 1L);
-    }
-
-    /**
-     * Adds new point p with value m to this histogram.
-     * @param p
-     * @param m
-     */
-    public void update(Number p, long m)
-    {
-        Number d = p.longValue() % this.roundSeconds;
-        if (d.longValue() > 0)
-            p =p.longValue() + (this.roundSeconds - d.longValue());
-
-        long[] mi = spool.get(p);
-        if (mi != null)
-        {
-            // we found the same p so increment that counter
-            mi[0] += m;
-        }
-        else
-        {
-            mi = new long[]{m};
-            spool.put(p, mi);
-        }
-
-        // If spool has overflowed, compact it
-        if(spool.size() > maxSpoolSize)
-            flushHistogram();
-    }
-
-    /**
-     * Drain the temporary spool into the final bins
-     */
-    public void flushHistogram()
-    {
-        if (spool.size() > 0)
-        {
-            long[] spoolValue;
-            long[] binValue;
-
-            // Iterate over the spool, copying the value into the primary bin map
-            // and compacting that map as necessary
-            for (Map.Entry<Number, long[]> entry : spool.entrySet())
-            {
-                Number key = entry.getKey();
-                spoolValue = entry.getValue();
-                binValue = bin.get(key);
-
-                // If this value is already in the final histogram bins
-                // Simply increment and update, otherwise, insert a new long[1] value
-                if(binValue != null)
-                {
-                    binValue[0] += spoolValue[0];
-                    bin.put(key, binValue);
-                }
-                else
-                {
-                    bin.put(key, new long[]{spoolValue[0]});
-                }
-
-                if (bin.size() > maxBinSize)
-                {
-                    // find points p1, p2 which have smallest difference
-                    Iterator<Number> keys = bin.keySet().iterator();
-                    double p1 = keys.next().doubleValue();
-                    double p2 = keys.next().doubleValue();
-                    double smallestDiff = p2 - p1;
-                    double q1 = p1, q2 = p2;
-                    while (keys.hasNext())
-                    {
-                        p1 = p2;
-                        p2 = keys.next().doubleValue();
-                        double diff = p2 - p1;
-                        if (diff < smallestDiff)
-                        {
-                            smallestDiff = diff;
-                            q1 = p1;
-                            q2 = p2;
-                        }
-                    }
-                    // merge those two
-                    long[] a1 = bin.remove(q1);
-                    long[] a2 = bin.remove(q2);
-                    long k1 = a1[0];
-                    long k2 = a2[0];
-
-                    a1[0] += k2;
-                    bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1);
-
-                }
-            }
-            spool.clear();
-        }
-    }
-
-    /**
-     * Merges given histogram with this histogram.
-     *
-     * @param other histogram to merge
-     */
-    public void merge(StreamingHistogram other)
-    {
-        if (other == null)
-            return;
-
-        other.flushHistogram();
-
-        for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet())
-            update(entry.getKey(), entry.getValue()[0]);
-    }
-
-    /**
-     * Calculates estimated number of points in interval [-inf,b].
-     *
-     * @param b upper bound of a interval to calculate sum
-     * @return estimated number of points in a interval [-inf,b].
-     */
-    public double sum(double b)
-    {
-        flushHistogram();
-        double sum = 0;
-        // find the points pi, pnext which satisfy pi <= b < pnext
-        Map.Entry<Number, long[]> pnext = bin.higherEntry(b);
-        if (pnext == null)
-        {
-            // if b is greater than any key in this histogram,
-            // just count all appearance and return
-            for (long[] value : bin.values())
-                sum += value[0];
-        }
-        else
-        {
-            Map.Entry<Number, long[]> pi = bin.floorEntry(b);
-            if (pi == null)
-                return 0;
-            // calculate estimated count mb for point b
-            double weight = (b - pi.getKey().doubleValue()) / (pnext.getKey().doubleValue() - pi.getKey().doubleValue());
-            double mb = pi.getValue()[0] + (pnext.getValue()[0] - pi.getValue()[0]) * weight;
-            sum += (pi.getValue()[0] + mb) * weight / 2;
-
-            sum += pi.getValue()[0] / 2.0;
-            for (long[] value : bin.headMap(pi.getKey(), false).values())
-                sum += value[0];
-        }
-        return sum;
-    }
-
-    public Map<Number, long[]> getAsMap()
-    {
-        flushHistogram();
-        return Collections.unmodifiableMap(bin);
-    }
-
-    public static class StreamingHistogramSerializer implements ISerializer<StreamingHistogram>
-    {
-        public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException
-        {
-            histogram.flushHistogram();
-            out.writeInt(histogram.maxBinSize);
-            Map<Number, long[]> entries = histogram.getAsMap();
-            out.writeInt(entries.size());
-            for (Map.Entry<Number, long[]> entry : entries.entrySet())
-            {
-                out.writeDouble(entry.getKey().doubleValue());
-                out.writeLong(entry.getValue()[0]);
-            }
-        }
-
-        public StreamingHistogram deserialize(DataInputPlus in) throws IOException
-        {
-            int maxBinSize = in.readInt();
-            int size = in.readInt();
-            Map<Double, Long> tmp = new HashMap<>(size);
-            for (int i = 0; i < size; i++)
-            {
-                tmp.put(in.readDouble(), in.readLong());
-            }
-
-            return new StreamingHistogram(maxBinSize, maxBinSize, 1, tmp);
-        }
-
-        public long serializedSize(StreamingHistogram histogram)
-        {
-            long size = TypeSizes.sizeof(histogram.maxBinSize);
-            Map<Number, long[]> entries = histogram.getAsMap();
-            size += TypeSizes.sizeof(entries.size());
-            // size of entries = size * (8(double) + 8(long))
-            size += entries.size() * (8L + 8L);
-            return size;
-        }
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-
-        if (!(o instanceof StreamingHistogram))
-            return false;
-
-        StreamingHistogram that = (StreamingHistogram) o;
-        return maxBinSize == that.maxBinSize &&
-               spool.equals(that.spool) &&
-               bin.equals(that.bin);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/utils/streamhist/HistogramDataConsumer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/streamhist/HistogramDataConsumer.java b/src/java/org/apache/cassandra/utils/streamhist/HistogramDataConsumer.java
new file mode 100755
index 0000000..274e7d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/streamhist/HistogramDataConsumer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.streamhist;
+
+/**
+ * This interface exists to avoid boxing primitive ints to Integers (otherwise <i>{@link java.util.function.BiConsumer}&lt;Integer, Integer&gt;</i> would have been sufficient).
+ */
+public interface HistogramDataConsumer<T extends Exception>
+{
+    void consume(int point, int value) throws T;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
new file mode 100755
index 0000000..9856253
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.streamhist;
+
+import java.math.RoundingMode;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import com.google.common.math.IntMath;
+
+import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.ACCUMULATED;
+import static org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.AddResult.INSERTED;
+
+/**
+ * Histogram that can be constructed from streaming of data.
+ * <p>
+ * The original algorithm is taken from following paper:
+ * Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
+ * http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
+ * <p>
+ * Algorithm: Histogram is represented as collection of {point, weight} pairs. When new point <i>p</i> with weight <i>m</i> is added:
+ * <ol>
+ * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li>
+ * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li>
+ * <li>If point was added and collection size became lorger than maxBinSize:</li>
+ * <ol type="a">
+ * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li>
+ * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li>
+ * </ol>
+ * </ol>
+ * <p>
+ * There are some optimization to make histogram builder faster:
+ * <ol>
+ *     <li>Spool: big map that saves from excessively merging of small bin. This map can contains up to maxSpoolSize points and accumulate weight from same points.
+ *     For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li>
+ *     <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values.
+ *     Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li>
+ *     <li>DistanceHolder - sorted collection of distances between points in Bin. It is used to find nearest points in constant time</li>
+ *     <li>Distances and Bin organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li>
+ *     <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin and <i></>{distance, left_point}</i> pairs is packed in one long</li>
+ * </ol>
+ */
+public class StreamingTombstoneHistogramBuilder
+{
+    // Buffer with point-value pair
+    private final DataHolder bin;
+
+    // Buffer with distance between points, sorted from nearest to furthest
+    private final DistanceHolder distances;
+
+    // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
+    private final Spool spool;
+
+    // voluntarily give up resolution for speed
+    private final int roundSeconds;
+
+    public StreamingTombstoneHistogramBuilder(int maxBinSize, int maxSpoolSize, int roundSeconds)
+    {
+        this.roundSeconds = roundSeconds;
+        this.bin = new DataHolder(maxBinSize + 1, roundSeconds);
+        distances = new DistanceHolder(maxBinSize);
+
+        //for spool we need power-of-two cells
+        maxSpoolSize = maxSpoolSize == 0 ? 0 : IntMath.pow(2, IntMath.log2(maxSpoolSize, RoundingMode.CEILING));
+        spool = new Spool(maxSpoolSize);
+    }
+
+    /**
+     * Adds new point p to this histogram.
+     *
+     * @param p
+     */
+    public void update(int p)
+    {
+        update(p, 1);
+    }
+
+    /**
+     * Adds new point p with value m to this histogram.
+     *
+     * @param p
+     * @param m
+     */
+    public void update(int p, int m)
+    {
+        p = roundKey(p, roundSeconds);
+
+        if (spool.capacity > 0)
+        {
+            if (!spool.tryAddOrAccumulate(p, m))
+            {
+                flushHistogram();
+                final boolean success = spool.tryAddOrAccumulate(p, m);
+                assert success : "Can not add value to spool"; // after spool flushing we should always be able to insert new value
+            }
+        }
+        else
+        {
+            flushValue(p, m);
+        }
+    }
+
+    /**
+     * Drain the temporary spool into the final bins
+     */
+    public void flushHistogram()
+    {
+        spool.forEach(this::flushValue);
+        spool.clear();
+    }
+
+    private void flushValue(int key, int spoolValue)
+    {
+        DataHolder.NeighboursAndResult addResult = bin.addValue(key, spoolValue);
+        if (addResult.result == INSERTED)
+        {
+            final int prevPoint = addResult.prevPoint;
+            final int nextPoint = addResult.nextPoint;
+            if (prevPoint != -1 && nextPoint != -1)
+                distances.remove(prevPoint, nextPoint);
+            if (prevPoint != -1)
+                distances.add(prevPoint, key);
+            if (nextPoint != -1)
+                distances.add(key, nextPoint);
+        }
+
+        if (bin.isFull())
+        {
+            mergeBin();
+        }
+    }
+
+    private void mergeBin()
+    {
+        // find points point1, point2 which have smallest difference
+        final int[] smallestDifference = distances.getFirstAndRemove();
+
+        final int point1 = smallestDifference[0];
+        final int point2 = smallestDifference[1];
+
+        // merge those two
+        DataHolder.MergeResult mergeResult = bin.merge(point1, point2);
+
+        final int nextPoint = mergeResult.nextPoint;
+        final int prevPoint = mergeResult.prevPoint;
+        final int newPoint = mergeResult.newPoint;
+
+        if (nextPoint != -1)
+        {
+            distances.remove(point2, nextPoint);
+            distances.add(newPoint, nextPoint);
+        }
+
+        if (prevPoint != -1)
+        {
+            distances.remove(prevPoint, point1);
+            distances.add(prevPoint, newPoint);
+        }
+    }
+
+    /**
+     * Creates a 'finished' snapshot of the current state of the historgram, but leaves this builder instance
+     * open for subsequent additions to the histograms. Basically, this allows us to have some degree of sanity
+     * wrt sstable early open.
+     */
+    public TombstoneHistogram build()
+    {
+        flushHistogram();
+        return new TombstoneHistogram(bin);
+    }
+
+    private static class DistanceHolder
+    {
+        private static final long EMPTY = Long.MAX_VALUE;
+        private final long[] data;
+
+        DistanceHolder(int maxCapacity)
+        {
+            data = new long[maxCapacity];
+            Arrays.fill(data, EMPTY);
+        }
+
+        void add(int prev, int next)
+        {
+            long key = getKey(prev, next);
+            int index = Arrays.binarySearch(data, key);
+
+            assert (index < 0) : "Element already exists";
+            assert (data[data.length - 1] == EMPTY) : "No more space in array";
+
+            index = -index - 1;
+            System.arraycopy(data, index, data, index + 1, data.length - index - 1);
+            data[index] = key;
+        }
+
+        void remove(int prev, int next)
+        {
+            long key = getKey(prev, next);
+            int index = Arrays.binarySearch(data, key);
+            if (index >= 0)
+            {
+                if (index < data.length)
+                    System.arraycopy(data, index + 1, data, index, data.length - index - 1);
+                data[data.length - 1] = EMPTY;
+            }
+        }
+
+        int[] getFirstAndRemove()
+        {
+            if (data[0] == EMPTY)
+                return null;
+
+            int[] result = unwrapKey(data[0]);
+            System.arraycopy(data, 1, data, 0, data.length - 1);
+            data[data.length - 1] = EMPTY;
+            return result;
+        }
+
+        private int[] unwrapKey(long key)
+        {
+            final int distance = (int) (key >> 32);
+            final int prev = (int) (key & 0xFF_FF_FF_FFL);
+            return new int[]{ prev, prev + distance };
+        }
+
+        private long getKey(int prev, int next)
+        {
+            long distance = next - prev;
+            return (distance << 32) | prev;
+        }
+
+        public String toString()
+        {
+            return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrapKey).map(Arrays::toString).collect(Collectors.joining());
+        }
+    }
+
+    static class DataHolder
+    {
+        private static final long EMPTY = Long.MAX_VALUE;
+        private final long[] data;
+        private final int roundSeconds;
+
+        DataHolder(int maxCapacity, int roundSeconds)
+        {
+            data = new long[maxCapacity];
+            Arrays.fill(data, EMPTY);
+            this.roundSeconds = roundSeconds;
+        }
+
+        DataHolder(DataHolder holder)
+        {
+            data = Arrays.copyOf(holder.data, holder.data.length);
+            roundSeconds = holder.roundSeconds;
+        }
+
+        NeighboursAndResult addValue(int point, int delta)
+        {
+            long key = wrap(point, 0);
+            int index = Arrays.binarySearch(data, key);
+            AddResult addResult;
+            if (index < 0)
+            {
+                index = -index - 1;
+                assert (index < data.length) : "No more space in array";
+
+                if (unwrapPoint(data[index]) != point) //ok, someone else at this point, let's shift array and insert
+                {
+                    assert (data[data.length - 1] == EMPTY) : "No more space in array";
+
+                    System.arraycopy(data, index, data, index + 1, data.length - index - 1);
+
+                    data[index] = wrap(point, delta);
+                    addResult = INSERTED;
+                }
+                else
+                {
+                    data[index] += delta;
+                    addResult = ACCUMULATED;
+                }
+            }
+            else
+            {
+                data[index] += delta;
+                addResult = ACCUMULATED;
+            }
+
+            return new NeighboursAndResult(getPrevPoint(index), getNextPoint(index), addResult);
+        }
+
+        public MergeResult merge(int point1, int point2)
+        {
+            long key = wrap(point1, 0);
+            int index = Arrays.binarySearch(data, key);
+            if (index < 0)
+            {
+                index = -index - 1;
+                assert (index < data.length) : "Not found in array";
+                assert (unwrapPoint(data[index]) == point1) : "Not found in array";
+            }
+
+            final int prevPoint = getPrevPoint(index);
+            final int nextPoint = getNextPoint(index + 1);
+
+            int value1 = unwrapValue(data[index]);
+            int value2 = unwrapValue(data[index + 1]);
+
+            assert (unwrapPoint(data[index + 1]) == point2) : "point2 should follow point1";
+
+            int sum = value1 + value2;
+
+            //let's evaluate in long values to handle overflow in multiplication
+            int newPoint = (int) (((long) point1 * value1 + (long) point2 * value2) / (value1 + value2));
+            newPoint = roundKey(newPoint, roundSeconds);
+            data[index] = wrap(newPoint, sum);
+
+            System.arraycopy(data, index + 2, data, index + 1, data.length - index - 2);
+            data[data.length - 1] = EMPTY;
+
+            return new MergeResult(prevPoint, newPoint, nextPoint);
+        }
+
+        private int getPrevPoint(int index)
+        {
+            if (index > 0)
+                if (data[index - 1] != EMPTY)
+                    return (int) (data[index - 1] >> 32);
+                else
+                    return -1;
+            else
+                return -1;
+        }
+
+        private int getNextPoint(int index)
+        {
+            if (index < data.length - 1)
+                if (data[index + 1] != EMPTY)
+                    return (int) (data[index + 1] >> 32);
+                else
+                    return -1;
+            else
+                return -1;
+        }
+
+        private int[] unwrap(long key)
+        {
+            final int point = unwrapPoint(key);
+            final int value = unwrapValue(key);
+            return new int[]{ point, value };
+        }
+
+        private int unwrapPoint(long key)
+        {
+            return (int) (key >> 32);
+        }
+
+        private int unwrapValue(long key)
+        {
+            return (int) (key & 0xFF_FF_FF_FFL);
+        }
+
+        private long wrap(int point, int value)
+        {
+            return (((long) point) << 32) | value;
+        }
+
+
+        public String toString()
+        {
+            return Arrays.stream(data).filter(x -> x != EMPTY).boxed().map(this::unwrap).map(Arrays::toString).collect(Collectors.joining());
+        }
+
+        public boolean isFull()
+        {
+            return data[data.length - 1] != EMPTY;
+        }
+
+        public <E extends Exception> void forEach(HistogramDataConsumer<E> histogramDataConsumer) throws E
+        {
+            for (long datum : data)
+            {
+                if (datum == EMPTY)
+                {
+                    break;
+                }
+
+                histogramDataConsumer.consume(unwrapPoint(datum), unwrapValue(datum));
+            }
+        }
+
+        public int size()
+        {
+            int[] accumulator = new int[1];
+            forEach((point, value) -> accumulator[0]++);
+            return accumulator[0];
+        }
+
+        public double sum(int b)
+        {
+            double sum = 0;
+
+            for (int i = 0; i < data.length; i++)
+            {
+                long pointAndValue = data[i];
+                if (pointAndValue == EMPTY)
+                {
+                    break;
+                }
+                final int point = unwrapPoint(pointAndValue);
+                final int value = unwrapValue(pointAndValue);
+                if (point > b)
+                {
+                    if (i == 0)
+                    { // no prev point
+                        return 0;
+                    }
+                    else
+                    {
+                        final int prevPoint = unwrapPoint(data[i - 1]);
+                        final int prevValue = unwrapValue(data[i - 1]);
+                        double weight = (b - prevPoint) / (double) (point - prevPoint);
+                        double mb = prevValue + (value - prevValue) * weight;
+                        sum -= prevValue;
+                        sum += (prevValue + mb) * weight / 2;
+                        sum += prevValue / 2.0;
+                        return sum;
+                    }
+                }
+                else
+                {
+                    sum += value;
+                }
+            }
+            return sum;
+        }
+
+        static class MergeResult
+        {
+            int prevPoint;
+            int newPoint;
+            int nextPoint;
+
+            MergeResult(int prevPoint, int newPoint, int nextPoint)
+            {
+                this.prevPoint = prevPoint;
+                this.newPoint = newPoint;
+                this.nextPoint = nextPoint;
+            }
+        }
+
+        static class NeighboursAndResult
+        {
+            int prevPoint;
+            int nextPoint;
+            AddResult result;
+
+            NeighboursAndResult(int prevPoint, int nextPoint, AddResult result)
+            {
+                this.prevPoint = prevPoint;
+                this.nextPoint = nextPoint;
+                this.result = result;
+            }
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Arrays.hashCode(data);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (!(o instanceof DataHolder))
+                return false;
+
+            final DataHolder other = ((DataHolder) o);
+
+            if (this.size()!=other.size())
+                return false;
+
+            for (int i=0; i<size(); i++)
+            {
+                if (data[i]!=other.data[i])
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    public enum AddResult
+    {
+        INSERTED,
+        ACCUMULATED
+    }
+
+    static class Spool
+    {
+        // odd elements - points, even elements - values
+        final int[] map;
+        final int capacity;
+        int size;
+
+        Spool(int capacity)
+        {
+            this.capacity = capacity;
+            if (capacity == 0)
+            {
+                map = new int[0];
+            }
+            else
+            {
+                assert IntMath.isPowerOfTwo(capacity) : "should be power of two";
+                // x2 because we want to save points and values in consecutive cells and x2 because we want reprobing less that two when _capacity_ values will be written
+                map = new int[capacity * 2 * 2];
+                clear();
+            }
+        }
+
+        void clear()
+        {
+            Arrays.fill(map, -1);
+            size = 0;
+        }
+
+        boolean tryAddOrAccumulate(int point, int delta)
+        {
+            if (size > capacity)
+            {
+                return false;
+            }
+
+            final int cell = 2 * ((capacity - 1) & hash(point));
+
+            // We use linear scanning. I think cluster of 100 elements is large enough to give up.
+            for (int attempt = 0; attempt < 100; attempt++)
+            {
+                if (tryCell(cell + attempt * 2, point, delta))
+                    return true;
+            }
+            return false;
+        }
+
+        private int hash(int i)
+        {
+            long largePrime = 948701839L;
+            return (int) (i * largePrime);
+        }
+
+        <E extends Exception> void forEach(HistogramDataConsumer<E> consumer) throws E
+        {
+            for (int i = 0; i < map.length; i += 2)
+            {
+                if (map[i] != -1)
+                {
+                    consumer.consume(map[i], map[i + 1]);
+                }
+            }
+        }
+
+        private boolean tryCell(int cell, int point, int delta)
+        {
+            cell = cell % map.length;
+            if (map[cell] == -1)
+            {
+                map[cell] = point;
+                map[cell + 1] = delta;
+                size++;
+                return true;
+            }
+            if (map[cell] == point)
+            {
+                map[cell + 1] += delta;
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private static int roundKey(int p, int roundSeconds)
+    {
+        int d = p % roundSeconds;
+        if (d > 0)
+            return p + (roundSeconds - d);
+        else
+            return p;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
new file mode 100755
index 0000000..19bdd27
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/streamhist/TombstoneHistogram.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.streamhist;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder.DataHolder;
+
+/**
+ * A snapshot or finished histrogram of tombstones for a sstable, as generated from {@link StreamingTombstoneHistogramBuilder}.
+ */
+public class TombstoneHistogram
+{
+    public static final HistogramSerializer serializer = new HistogramSerializer();
+
+    // Buffer with point-value pair
+    private final DataHolder bin;
+
+    /**
+     * Creates a new histogram with max bin size of maxBinSize
+     */
+    TombstoneHistogram(DataHolder holder)
+    {
+        bin = new DataHolder(holder);
+    }
+
+    public static TombstoneHistogram createDefault()
+    {
+        return new TombstoneHistogram(new DataHolder(0, 1));
+    }
+
+    /**
+     * Calculates estimated number of points in interval [-inf,b].
+     *
+     * @param b upper bound of a interval to calculate sum
+     * @return estimated number of points in a interval [-inf,b].
+     */
+    public double sum(double b)
+    {
+        return bin.sum((int) b);
+    }
+
+    public int size()
+    {
+        return this.bin.size();
+    }
+
+    public <E extends Exception> void forEach(HistogramDataConsumer<E> histogramDataConsumer) throws E
+    {
+        this.bin.forEach(histogramDataConsumer);
+    }
+
+    public static class HistogramSerializer implements ISerializer<TombstoneHistogram>
+    {
+        public void serialize(TombstoneHistogram histogram, DataOutputPlus out) throws IOException
+        {
+            final int size = histogram.size();
+            final int maxBinSize = size; // we write this for legacy reasons
+            out.writeInt(maxBinSize);
+            out.writeInt(size);
+            histogram.forEach((point, value) ->
+                              {
+                                  out.writeDouble((double) point);
+                                  out.writeLong((long) value);
+                              });
+        }
+
+        public TombstoneHistogram deserialize(DataInputPlus in) throws IOException
+        {
+            in.readInt(); // max bin size
+            int size = in.readInt();
+            DataHolder dataHolder = new DataHolder(size, 1);
+            for (int i = 0; i < size; i++)
+            {
+                dataHolder.addValue((int)in.readDouble(), (int)in.readLong());
+            }
+
+            return new TombstoneHistogram(dataHolder);
+        }
+
+        public long serializedSize(TombstoneHistogram histogram)
+        {
+            int maxBinSize = 0;
+            long size = TypeSizes.sizeof(maxBinSize);
+            final int histSize = histogram.size();
+            size += TypeSizes.sizeof(histSize);
+            // size of entries = size * (8(double) + 8(long))
+            size += histSize * (8L + 8L);
+            return size;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof TombstoneHistogram))
+            return false;
+
+        TombstoneHistogram that = (TombstoneHistogram) o;
+        return bin.equals(that.bin);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return bin.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java b/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
deleted file mode 100644
index 23e8f4e..0000000
--- a/test/microbench/org/apache/cassandra/test/microbench/StreamingHistogramBench.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.test.microbench;
-
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.Random;
-
-import org.apache.cassandra.utils.StreamingHistogram;
-import org.openjdk.jmh.annotations.*;
-
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
-@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
-@Fork(value = 1)
-@Threads(1)
-@State(Scope.Benchmark)
-public class StreamingHistogramBench
-{
-
-    StreamingHistogram streamingHistogram;
-    StreamingHistogram newStreamingHistogram;
-    StreamingHistogram newStreamingHistogram2;
-    StreamingHistogram newStreamingHistogram3;
-    StreamingHistogram newStreamingHistogram4;
-    StreamingHistogram newStreamingHistogram5;
-    StreamingHistogram newStreamingHistogram6;
-    StreamingHistogram streamingHistogram60;
-    StreamingHistogram newStreamingHistogram60;
-    StreamingHistogram newStreamingHistogram100x60;
-
-    StreamingHistogram narrowstreamingHistogram;
-    StreamingHistogram narrownewStreamingHistogram;
-    StreamingHistogram narrownewStreamingHistogram2;
-    StreamingHistogram narrownewStreamingHistogram3;
-    StreamingHistogram narrownewStreamingHistogram4;
-    StreamingHistogram narrownewStreamingHistogram5;
-    StreamingHistogram narrownewStreamingHistogram6;
-    StreamingHistogram narrownewStreamingHistogram60;
-    StreamingHistogram narrowstreamingHistogram60;
-    StreamingHistogram narrownewStreamingHistogram100x60;
-
-    StreamingHistogram sparsestreamingHistogram;
-    StreamingHistogram sparsenewStreamingHistogram;
-    StreamingHistogram sparsenewStreamingHistogram2;
-    StreamingHistogram sparsenewStreamingHistogram3;
-    StreamingHistogram sparsenewStreamingHistogram4;
-    StreamingHistogram sparsenewStreamingHistogram5;
-    StreamingHistogram sparsenewStreamingHistogram6;
-    StreamingHistogram sparsestreamingHistogram60;
-    StreamingHistogram sparsenewStreamingHistogram60;
-    StreamingHistogram sparsenewStreamingHistogram100x60;
-
-    static int[] ttls = new int[10000000];
-    static int[] narrowttls = new int[10000000];
-    static int[] sparsettls = new int[10000000];
-    static
-    {
-        Random random = new Random();
-        for(int i = 0 ; i < 10000000; i++)
-        {
-            // Seconds in a day
-            ttls[i] = random.nextInt(86400);
-            // Seconds in 3 hours
-            narrowttls[i] = random.nextInt(14400);
-            // Seconds in a minute
-            sparsettls[i] = random.nextInt(60);
-        }
-    }
-
-    @Setup(Level.Trial)
-    public void setup() throws Throwable
-    {
-
-        streamingHistogram = new StreamingHistogram(100, 0, 1);
-        newStreamingHistogram = new StreamingHistogram(100, 1000, 1);
-        newStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
-        newStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
-        newStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
-        newStreamingHistogram5 = new StreamingHistogram(50, 10000,1 );
-        newStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
-        streamingHistogram60 = new StreamingHistogram(100, 0, 60);
-        newStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
-        newStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
-
-        narrowstreamingHistogram = new StreamingHistogram(100, 0, 1);
-        narrownewStreamingHistogram = new StreamingHistogram(100, 1000, 1);
-        narrownewStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
-        narrownewStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
-        narrownewStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
-        narrownewStreamingHistogram5 = new StreamingHistogram(50, 10000, 1);
-        narrownewStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
-        narrowstreamingHistogram60 = new StreamingHistogram(100, 0, 60);
-        narrownewStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
-        narrownewStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
-
-
-        sparsestreamingHistogram = new StreamingHistogram(100, 0, 1);
-        sparsenewStreamingHistogram = new StreamingHistogram(100, 1000, 1);
-        sparsenewStreamingHistogram2 = new StreamingHistogram(100, 10000, 1);
-        sparsenewStreamingHistogram3 = new StreamingHistogram(100, 100000, 1);
-        sparsenewStreamingHistogram4 = new StreamingHistogram(50, 100000, 1);
-        sparsenewStreamingHistogram5 = new StreamingHistogram(50, 10000, 1);
-        sparsenewStreamingHistogram6 = new StreamingHistogram(100, 1000000, 1);
-        sparsestreamingHistogram60 = new StreamingHistogram(100, 0, 60);
-        sparsenewStreamingHistogram60 = new StreamingHistogram(100, 100000, 60);
-        sparsenewStreamingHistogram100x60 = new StreamingHistogram(100, 10000, 60);
-
-    }
-
-    @TearDown(Level.Trial)
-    public void teardown() throws IOException, ExecutionException, InterruptedException
-    {
-
-    }
-
-    @Benchmark
-    public void existingSH() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            streamingHistogram.update(ttls[i]);
-        streamingHistogram.flushHistogram();
-    }
-
-    @Benchmark
-    public void newSH10x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram.update(ttls[i]);
-        newStreamingHistogram.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void newSH100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram2.update(ttls[i]);
-        newStreamingHistogram2.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void newSH1000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram3.update(ttls[i]);
-        newStreamingHistogram3.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void newSH10000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram6.update(ttls[i]);
-        newStreamingHistogram6.flushHistogram();
-
-    }
-
-
-    @Benchmark
-    public void newSH50and1000() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram4.update(ttls[i]);
-        newStreamingHistogram4.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void newSH50and100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram5.update(ttls[i]);
-        newStreamingHistogram5.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void streaminghistogram60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            streamingHistogram60.update(sparsettls[i]);
-        streamingHistogram60.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void newstreaminghistogram1000x60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram60.update(sparsettls[i]);
-        newStreamingHistogram60.flushHistogram();
-    }
-
-    @Benchmark
-    public void newstreaminghistogram100x60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            newStreamingHistogram100x60.update(sparsettls[i]);
-        newStreamingHistogram100x60.flushHistogram();
-    }
-
-
-    @Benchmark
-    public void narrowexistingSH() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrowstreamingHistogram.update(narrowttls[i]);
-        narrowstreamingHistogram.flushHistogram();
-    }
-
-    @Benchmark
-    public void narrownewSH10x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram.update(narrowttls[i]);
-        narrownewStreamingHistogram.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewSH100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram2.update(narrowttls[i]);
-        narrownewStreamingHistogram2.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewSH1000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram3.update(narrowttls[i]);
-        narrownewStreamingHistogram3.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewSH10000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram6.update(ttls[i]);
-        narrownewStreamingHistogram6.flushHistogram();
-
-    }
-
-
-    @Benchmark
-    public void narrownewSH50and1000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram4.update(narrowttls[i]);
-        narrownewStreamingHistogram4.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewSH50and100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram5.update(narrowttls[i]);
-        narrownewStreamingHistogram5.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrowstreaminghistogram60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrowstreamingHistogram60.update(sparsettls[i]);
-        narrowstreamingHistogram60.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewstreaminghistogram1000x60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram60.update(sparsettls[i]);
-        narrownewStreamingHistogram60.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void narrownewstreaminghistogram100x60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            narrownewStreamingHistogram100x60.update(sparsettls[i]);
-        narrownewStreamingHistogram100x60.flushHistogram();
-
-    }
-
-
-    @Benchmark
-    public void sparseexistingSH() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsestreamingHistogram.update(sparsettls[i]);
-        sparsestreamingHistogram.flushHistogram();
-    }
-
-    @Benchmark
-    public void sparsenewSH10x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram.update(sparsettls[i]);
-        sparsenewStreamingHistogram.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewSH100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram2.update(sparsettls[i]);
-        sparsenewStreamingHistogram2.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewSH1000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram3.update(sparsettls[i]);
-        sparsenewStreamingHistogram3.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewSH10000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram6.update(ttls[i]);
-        sparsenewStreamingHistogram6.flushHistogram();
-    }
-
-
-    @Benchmark
-    public void sparsenewSH50and1000x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram4.update(sparsettls[i]);
-        sparsenewStreamingHistogram4.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewSH50and100x() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram5.update(sparsettls[i]);
-        sparsenewStreamingHistogram5.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsestreaminghistogram60s() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsestreamingHistogram60.update(sparsettls[i]);
-        sparsestreamingHistogram60.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewstreaminghistogram1000x60() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram60.update(sparsettls[i]);
-        sparsenewStreamingHistogram60.flushHistogram();
-
-    }
-
-    @Benchmark
-    public void sparsenewstreaminghistogram100x60() throws Throwable
-    {
-        for(int i = 0 ; i < ttls.length; i++)
-            sparsenewStreamingHistogram100x60.update(sparsettls[i]);
-        sparsenewStreamingHistogram100x60.flushHistogram();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/test/microbench/org/apache/cassandra/test/microbench/StreamingTombstoneHistogramBuilderBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/StreamingTombstoneHistogramBuilderBench.java b/test/microbench/org/apache/cassandra/test/microbench/StreamingTombstoneHistogramBuilderBench.java
new file mode 100755
index 0000000..f3df8c1
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/StreamingTombstoneHistogramBuilderBench.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+
+import java.util.concurrent.TimeUnit;
+import java.util.Random;
+
+import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.profile.*;
+import org.openjdk.jmh.runner.*;
+import org.openjdk.jmh.runner.options.*;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+@State(Scope.Benchmark)
+public class StreamingTombstoneHistogramBuilderBench
+{
+
+    static int[] secondInMonth = new int[10000000];
+    static int[] secondInDay = new int[10000000];
+    static int[] secondIn3Hour = new int[10000000];
+    static int[] secondInMin = new int[10000000];
+
+    static
+    {
+        final int now = (int) (System.currentTimeMillis() / 1000L);
+        Random random = new Random();
+        for(int i = 0 ; i < 10000000; i++)
+        {
+            // Seconds in a month
+            secondInMonth[i] = now + random.nextInt(3600 * 24 * 30);
+            // Seconds in a day
+            secondInDay[i] = now + random.nextInt(3600 * 24);
+            // Seconds in 3 hours
+            secondIn3Hour[i] = now + random.nextInt(3600 * 3);
+            // Seconds in a minute
+            secondInMin[i] = now + random.nextInt(60);
+        }
+    }
+
+    @Param({ "secondInMonth", "secondInDay", "secondIn3Hour", "secondInMin" })
+    String a_workLoad;
+
+    @Param({ "0", "1000", "10000", "100000" })
+    int b_spoolSize;
+
+    @Benchmark
+    public void test()
+    {
+        StreamingTombstoneHistogramBuilder histogram = new StreamingTombstoneHistogramBuilder(100, b_spoolSize, 1);
+        int[] data = selectWorkload(a_workLoad);
+
+        for (int time : data)
+        {
+            histogram.update(time);
+        }
+
+        histogram.flushHistogram();
+    }
+
+    private int[] selectWorkload(String workLoad)
+    {
+        switch (workLoad)
+        {
+            case "secondInMonth":
+                return secondInMonth;
+            case "secondInDay":
+                return secondInDay;
+            case "secondIn3Hour":
+                return secondIn3Hour;
+            case "secondInMin":
+                return secondInMin;
+            default:
+                throw new IllegalArgumentException("Invalid workload type: " + workLoad);
+        }
+    }
+
+
+    public static void main(String[] args) throws Exception
+    {
+        Options opt = new OptionsBuilder()
+                      .include(StreamingTombstoneHistogramBuilderBench.class.getSimpleName())
+                      .warmupIterations(3)
+                      .measurementIterations(10)
+                      .addProfiler(GCProfiler.class)
+                      .threads(1)
+                      .forks(1)
+                      .build();
+        new Runner(opt).run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
deleted file mode 100644
index f64cbd9..0000000
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.junit.Test;
-
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-public class StreamingHistogramTest
-{
-    @Test
-    public void testFunction() throws Exception
-    {
-        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
-        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
-
-        // add 7 points to histogram of 5 bins
-        for (int i = 0; i < 7; i++)
-        {
-            hist.update(samples[i]);
-        }
-
-        // should end up (2,1),(9.5,2),(17.5,2),(23,1),(36,1)
-        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
-        expected1.put(2.0, 1L);
-        expected1.put(9.5, 2L);
-        expected1.put(17.5, 2L);
-        expected1.put(23.0, 1L);
-        expected1.put(36.0, 1L);
-
-        Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
-        for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
-        {
-            Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
-            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
-        }
-
-        // merge test
-        StreamingHistogram hist2 = new StreamingHistogram(3, 3, 1);
-        for (int i = 7; i < samples.length; i++)
-        {
-            hist2.update(samples[i]);
-        }
-        hist.merge(hist2);
-        // should end up (2,1),(9.5,2),(19.33,3),(32.67,3),(45,1)
-        Map<Double, Long> expected2 = new LinkedHashMap<Double, Long>(5);
-        expected2.put(2.0, 1L);
-        expected2.put(9.5, 2L);
-        expected2.put(19.33, 3L);
-        expected2.put(32.67, 3L);
-        expected2.put(45.0, 1L);
-        expectedItr = expected2.entrySet().iterator();
-        for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
-        {
-            Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
-            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
-        }
-
-        // sum test
-        assertEquals(3.28, hist.sum(15), 0.01);
-        // sum test (b > max(hist))
-        assertEquals(10.0, hist.sum(50), 0.01);
-    }
-
-    @Test
-    public void testSerDe() throws Exception
-    {
-        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
-        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9};
-
-        // add 7 points to histogram of 5 bins
-        for (int i = 0; i < samples.length; i++)
-        {
-            hist.update(samples[i]);
-        }
-
-        DataOutputBuffer out = new DataOutputBuffer();
-        StreamingHistogram.serializer.serialize(hist, out);
-        byte[] bytes = out.toByteArray();
-
-        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes));
-
-        // deserialized histogram should have following values
-        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
-        expected1.put(2.0, 1L);
-        expected1.put(9.5, 2L);
-        expected1.put(17.5, 2L);
-        expected1.put(23.0, 1L);
-        expected1.put(36.0, 1L);
-
-        Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
-        for (Map.Entry<Number, long[]> actual : deserialized.getAsMap().entrySet())
-        {
-            Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
-            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
-        }
-    }
-
-
-    @Test
-    public void testNumericTypes() throws Exception
-    {
-        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
-
-        hist.update(2);
-        hist.update(2.0);
-        hist.update(2L);
-
-        Map<Number, long[]> asMap = hist.getAsMap();
-
-        assertEquals(1, asMap.size());
-        assertEquals(3L, asMap.get(2)[0]);
-
-        //Make sure it's working with Serde
-        DataOutputBuffer out = new DataOutputBuffer();
-        StreamingHistogram.serializer.serialize(hist, out);
-        byte[] bytes = out.toByteArray();
-
-        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes));
-
-        deserialized.update(2L);
-
-        asMap = deserialized.getAsMap();
-        assertEquals(1, asMap.size());
-        assertEquals(4L, asMap.get(2)[0]);
-    }
-
-    @Test
-    public void testOverflow() throws Exception
-    {
-        StreamingHistogram hist = new StreamingHistogram(5, 10, 1);
-        long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31,
-                                    32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
-                                    32, 32, 33, 34, 35, 70, 78, 80, 90, 100
-                                    };
-
-        // Hit the spool cap, force it to make bins
-        for (int i = 0; i < samples.length; i++)
-        {
-            hist.update(samples[i]);
-        }
-
-        assertEquals(5, hist.getAsMap().keySet().size());
-
-    }
-
-    @Test
-    public void testRounding() throws Exception
-    {
-        StreamingHistogram hist = new StreamingHistogram(5, 10, 60);
-        long[] samples = new long[] { 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180, 240, 300
-        for (int i = 0 ; i < samples.length ; i++)
-            hist.update(samples[i]);
-        assertEquals(hist.getAsMap().keySet().size(), 5);
-        assertEquals(hist.getAsMap().get(60)[0], 2);
-        assertEquals(hist.getAsMap().get(120)[0], 1);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06da35fd/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
new file mode 100755
index 0000000..c4da5cb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilderTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.streamhist;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamingTombstoneHistogramBuilderTest
+{
+    @Test
+    public void testFunction() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
+        int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9, 32, 30, 45 };
+
+        // add 7 points to histogram of 5 bins
+        for (int i = 0; i < 7; i++)
+        {
+            builder.update(samples[i]);
+        }
+
+        // should end up (2,1),(9.5,2),(17.5,2),(23,1),(36,1)
+        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
+        expected1.put(2.0, 1L);
+        expected1.put(9.0, 2L);
+        expected1.put(17.0, 2L);
+        expected1.put(23.0, 1L);
+        expected1.put(36.0, 1L);
+
+        Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
+        TombstoneHistogram hist = builder.build();
+        hist.forEach((point, value) ->
+                     {
+                         Map.Entry<Double, Long> entry = expectedItr.next();
+                         assertEquals(entry.getKey(), point, 0.01);
+                         assertEquals(entry.getValue().longValue(), value);
+                     });
+
+        // sum test
+        assertEquals(3.5, hist.sum(15), 0.01);
+        // sum test (b > max(hist))
+        assertEquals(7.0, hist.sum(50), 0.01);
+    }
+
+    @Test
+    public void testSerDe() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
+        int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9 };
+
+        // add 7 points to histogram of 5 bins
+        for (int i = 0; i < samples.length; i++)
+        {
+            builder.update(samples[i]);
+        }
+        TombstoneHistogram hist = builder.build();
+        DataOutputBuffer out = new DataOutputBuffer();
+        TombstoneHistogram.serializer.serialize(hist, out);
+        byte[] bytes = out.toByteArray();
+
+        TombstoneHistogram deserialized = TombstoneHistogram.serializer.deserialize(new DataInputBuffer(bytes));
+
+        // deserialized histogram should have following values
+        Map<Double, Long> expected1 = new LinkedHashMap<Double, Long>(5);
+        expected1.put(2.0, 1L);
+        expected1.put(9.0, 2L);
+        expected1.put(17.0, 2L);
+        expected1.put(23.0, 1L);
+        expected1.put(36.0, 1L);
+
+        Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
+        deserialized.forEach((point, value) ->
+                             {
+                                 Map.Entry<Double, Long> entry = expectedItr.next();
+                                 assertEquals(entry.getKey(), point, 0.01);
+                                 assertEquals(entry.getValue().longValue(), value);
+                             });
+    }
+
+
+    @Test
+    public void testNumericTypes() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
+
+        builder.update(2);
+        builder.update(2);
+        builder.update(2);
+        TombstoneHistogram hist = builder.build();
+        Map<Integer, Integer> asMap = asMap(hist);
+
+        assertEquals(1, asMap.size());
+        assertEquals(3, asMap.get(2).intValue());
+
+        //Make sure it's working with Serde
+        DataOutputBuffer out = new DataOutputBuffer();
+        TombstoneHistogram.serializer.serialize(hist, out);
+        byte[] bytes = out.toByteArray();
+
+        TombstoneHistogram deserialized = TombstoneHistogram.serializer.deserialize(new DataInputBuffer(bytes));
+
+        asMap = asMap(deserialized);
+        assertEquals(1, deserialized.size());
+        assertEquals(3, asMap.get(2).intValue());
+    }
+
+    @Test
+    public void testOverflow() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 1);
+        int[] samples = new int[]{ 23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31,
+                                   32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
+                                   32, 32, 33, 34, 35, 70, 78, 80, 90, 100
+        };
+
+        // Hit the spool cap, force it to make bins
+        for (int i = 0; i < samples.length; i++)
+        {
+            builder.update(samples[i]);
+        }
+
+        assertEquals(5, builder.build().size());
+    }
+
+    @Test
+    public void testRounding() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 10, 60);
+        int[] samples = new int[]{ 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180, 240, 300
+        for (int i = 0; i < samples.length; i++)
+            builder.update(samples[i]);
+        TombstoneHistogram hist = builder.build();
+        assertEquals(hist.size(), 5);
+        assertEquals(asMap(hist).get(60).intValue(), 2);
+        assertEquals(asMap(hist).get(120).intValue(), 1);
+    }
+
+    @Test
+    public void testLargeValues() throws Exception
+    {
+        StreamingTombstoneHistogramBuilder builder = new StreamingTombstoneHistogramBuilder(5, 0, 1);
+        IntStream.range(Integer.MAX_VALUE - 30, Integer.MAX_VALUE).forEach(builder::update);
+    }
+
+    private Map<Integer, Integer> asMap(TombstoneHistogram histogram)
+    {
+        Map<Integer, Integer> result = new HashMap<>();
+        histogram.forEach(result::put);
+        return result;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message