cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jji...@apache.org
Subject [5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Tue, 28 Feb 2017 01:26:29 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/942b83ca
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/942b83ca
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/942b83ca

Branch: refs/heads/cassandra-3.11
Commit: 942b83ca93d6d2380a830cb775e8506cc1cf0324
Parents: 1dc1aa1 a5ce963
Author: Jeff Jirsa <jeff@jeffjirsa.net>
Authored: Mon Feb 27 17:22:31 2017 -0800
Committer: Jeff Jirsa <jeff@jeffjirsa.net>
Committed: Mon Feb 27 17:23:17 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +
 .../io/sstable/metadata/MetadataCollector.java  |   2 +-
 .../cassandra/utils/StreamingHistogram.java     | 133 ++++--
 .../microbench/StreamingHistogramBench.java     | 405 +++++++++++++++++++
 .../db/compaction/CompactionsTest.java          |   3 +
 .../DateTieredCompactionStrategyTest.java       |   4 +
 .../LeveledCompactionStrategyTest.java          |   4 +
 .../SizeTieredCompactionStrategyTest.java       |   4 +
 .../cassandra/db/compaction/TTLExpiryTest.java  |   4 +
 .../TimeWindowCompactionStrategyTest.java       |   4 +
 .../cassandra/utils/StreamingHistogramTest.java |  40 +-
 12 files changed, 569 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f5b9d28,1100bfd..1810cae
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
 -3.0.12
 +3.11.0
 + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 +Merged from 3.0:
+  * Faster StreamingHistogram (CASSANDRA-13038)
   * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
   * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
   * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTable.java
index 601f5a0,1e4488c..fdcd17f
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@@ -59,7 -58,10 +59,9 @@@ public abstract class SSTabl
  {
      static final Logger logger = LoggerFactory.getLogger(SSTable.class);
  
 -
      public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
+     public static final int TOMBSTONE_HISTOGRAM_SPOOL_SIZE = 100000;
+     public static final int TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS = Integer.valueOf(System.getProperty("cassandra.streaminghistogram.roundseconds",
"60"));
  
      public final Descriptor descriptor;
      protected final Set<Component> components;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/StreamingHistogram.java
index a500450,fffa73e..6fde931
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@@ -39,11 -39,11 +39,14 @@@ public class StreamingHistogra
      public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer();
  
      // TreeMap to hold bins of histogram.
 -    private final TreeMap<Double, Long> bin;
 +    // The key is a numeric type so we can avoid boxing/unboxing streams of different key
types
 +    // The value is a unboxed long array always of length == 1
 +    // Serialized Histograms always writes with double keys for backwards compatibility
 +    private final TreeMap<Number, long[]> bin;
  
+     // Keep a second, larger buffer to spool data in, before finalizing it into `bin`
 -    private final TreeMap<Double, Long> spool;
++    private final TreeMap<Number, long[]> spool;
+ 
      // maximum bin size for this histogram
      private final int maxBinSize;
  
@@@ -51,22 -57,22 +60,31 @@@
       * Creates a new histogram with max bin size of maxBinSize
       * @param maxBinSize maximum number of bins this histogram can have
       */
-     public StreamingHistogram(int maxBinSize)
+     public StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds)
      {
          this.maxBinSize = maxBinSize;
+         this.maxSpoolSize = maxSpoolSize;
+         this.roundSeconds = roundSeconds;
 -        bin = new TreeMap<>();
 -        spool = new TreeMap<>();
 +        bin = new TreeMap<>((o1, o2) -> {
 +            if (o1.getClass().equals(o2.getClass()))
 +                return ((Comparable)o1).compareTo(o2);
 +            else
 +            	return Double.compare(o1.doubleValue(), o2.doubleValue());
 +        });
++        spool = new TreeMap<>((o1, o2) -> {
++            if (o1.getClass().equals(o2.getClass()))
++                return ((Comparable)o1).compareTo(o2);
++            else
++                return Double.compare(o1.doubleValue(), o2.doubleValue());
++        });
++
      }
  
-     private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
 -    private StreamingHistogram(int maxBinSize, int maxSpoolSize, int roundSeconds, Map<Double,
Long> bin)
++    private StreamingHistogram(int maxBinSize, int maxSpoolSize,  int roundSeconds, Map<Double,
Long> bin)
      {
-         this(maxBinSize);
 -        this.maxBinSize = maxBinSize;
 -        this.maxSpoolSize = maxSpoolSize;
 -        this.roundSeconds = roundSeconds;
 -        this.bin = new TreeMap<>(bin);
 -        this.spool = new TreeMap<>();
++        this(maxBinSize, maxSpoolSize, roundSeconds);
 +        for (Map.Entry<Double, Long> entry : bin.entrySet())
 +            this.bin.put(entry.getKey(), new long[]{entry.getValue()});
      }
  
      /**
@@@ -83,9 -89,13 +101,13 @@@
       * @param p
       * @param m
       */
 -    public void update(double p, long m)
 +    public void update(Number p, long m)
      {
-         long[] mi = bin.get(p);
 -        double d = p % this.roundSeconds;
 -        if (d > 0)
 -            p = p + (this.roundSeconds - d);
++        Number d = p.longValue() % this.roundSeconds;
++        if (d.longValue() > 0)
++            p =p.longValue() + (this.roundSeconds - d.longValue());
+ 
 -        Long mi = spool.get(p);
++        long[] mi = spool.get(p);
          if (mi != null)
          {
              // we found the same p so increment that counter
@@@ -93,38 -103,65 +115,77 @@@
          }
          else
          {
 -            spool.put(p, m);
 +            mi = new long[]{m};
-             bin.put(p, mi);
-             // if bin size exceeds maximum bin size then trim down to max size
-             while (bin.size() > maxBinSize)
++            spool.put(p, mi);
+         }
++
++        // If spool has overflowed, compact it
+         if(spool.size() > maxSpoolSize)
+             flushHistogram();
+     }
+ 
+     /**
+      * Drain the temporary spool into the final bins
+      */
+     public void flushHistogram()
+     {
 -        if(spool.size() > 0)
++        if (spool.size() > 0)
+         {
 -            Long spoolValue;
 -            Long binValue;
++            long[] spoolValue;
++            long[] binValue;
+ 
+             // Iterate over the spool, copying the value into the primary bin map
+             // and compacting that map as necessary
 -            for (Map.Entry<Double, Long> entry : spool.entrySet())
++            for (Map.Entry<Number, long[]> entry : spool.entrySet())
              {
-                 // find points p1, p2 which have smallest difference
-                 Iterator<Number> keys = bin.keySet().iterator();
-                 double p1 = keys.next().doubleValue();
-                 double p2 = keys.next().doubleValue();
-                 double smallestDiff = p2 - p1;
-                 double q1 = p1, q2 = p2;
-                 while (keys.hasNext())
 -                Double key = entry.getKey();
++                Number key = entry.getKey();
+                 spoolValue = entry.getValue();
+                 binValue = bin.get(key);
+ 
 -                if (binValue != null)
++                // If this value is already in the final histogram bins
++                // Simply increment and update, otherwise, insert a new long[1] value
++                if(binValue != null)
+                 {
 -                    binValue += spoolValue;
++                    binValue[0] += spoolValue[0];
+                     bin.put(key, binValue);
 -                } else
 -                    {
 -                    bin.put(key, spoolValue);
++                }
++                else
 +                {
-                     p1 = p2;
-                     p2 = keys.next().doubleValue();
-                     double diff = p2 - p1;
-                     if (diff < smallestDiff)
++                    bin.put(key, new long[]{spoolValue[0]});
+                 }
+ 
 -                // if bin size exceeds maximum bin size then trim down to max size
+                 if (bin.size() > maxBinSize)
+                 {
+                     // find points p1, p2 which have smallest difference
 -                    Iterator<Double> keys = bin.keySet().iterator();
 -                    double p1 = keys.next();
 -                    double p2 = keys.next();
++                    Iterator<Number> keys = bin.keySet().iterator();
++                    double p1 = keys.next().doubleValue();
++                    double p2 = keys.next().doubleValue();
+                     double smallestDiff = p2 - p1;
+                     double q1 = p1, q2 = p2;
 -                    while (keys.hasNext()) {
++                    while (keys.hasNext())
 +                    {
-                         smallestDiff = diff;
-                         q1 = p1;
-                         q2 = p2;
+                         p1 = p2;
 -                        p2 = keys.next();
++                        p2 = keys.next().doubleValue();
+                         double diff = p2 - p1;
 -                        if (diff < smallestDiff) {
++                        if (diff < smallestDiff)
++                        {
+                             smallestDiff = diff;
+                             q1 = p1;
+                             q2 = p2;
+                         }
                      }
+                     // merge those two
 -                    long k1 = bin.remove(q1);
 -                    long k2 = bin.remove(q2);
 -                    bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
++                    long[] a1 = bin.remove(q1);
++                    long[] a2 = bin.remove(q2);
++                    long k1 = a1[0];
++                    long k2 = a2[0];
++
++                    a1[0] += k2;
++                    bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1);
++
                  }
-                 // merge those two
-                 long[] a1 = bin.remove(q1);
-                 long[] a2 = bin.remove(q2);
-                 long k1 = a1[0];
-                 long k2 = a2[0];
- 
-                 a1[0] += k2;
-                 bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1);
              }
+             spool.clear();
          }
      }
  
@@@ -138,8 -175,10 +199,10 @@@
          if (other == null)
              return;
  
 -        flushHistogram();
++        other.flushHistogram();
+ 
 -        for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet())
 -            update(entry.getKey(), entry.getValue());
 +        for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet())
 +            update(entry.getKey(), entry.getValue()[0]);
      }
  
      /**
@@@ -150,9 -189,10 +213,10 @@@
       */
      public double sum(double b)
      {
+         flushHistogram();
          double sum = 0;
          // find the points pi, pnext which satisfy pi <= b < pnext
 -        Map.Entry<Double, Long> pnext = bin.higherEntry(b);
 +        Map.Entry<Number, long[]> pnext = bin.higherEntry(b);
          if (pnext == null)
          {
              // if b is greater than any key in this histogram,
@@@ -177,8 -217,9 +241,9 @@@
          return sum;
      }
  
 -    public Map<Double, Long> getAsMap()
 +    public Map<Number, long[]> getAsMap()
      {
+         flushHistogram();
          return Collections.unmodifiableMap(bin);
      }
  
@@@ -186,13 -227,13 +251,14 @@@
      {
          public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException
          {
++            histogram.flushHistogram();
              out.writeInt(histogram.maxBinSize);
 -            Map<Double, Long> entries = histogram.getAsMap();
 +            Map<Number, long[]> entries = histogram.getAsMap();
              out.writeInt(entries.size());
 -            for (Map.Entry<Double, Long> entry : entries.entrySet())
 +            for (Map.Entry<Number, long[]> entry : entries.entrySet())
              {
 -                out.writeDouble(entry.getKey());
 -                out.writeLong(entry.getValue());
 +                out.writeDouble(entry.getKey().doubleValue());
 +                out.writeLong(entry.getValue()[0]);
              }
          }
  
@@@ -230,13 -271,13 +296,16 @@@
              return false;
  
          StreamingHistogram that = (StreamingHistogram) o;
--        return maxBinSize == that.maxBinSize && bin.equals(that.bin);
++        return maxBinSize == that.maxBinSize &&
++               maxSpoolSize == that.maxSpoolSize &&
++               spool.equals(that.spool) &&
++               bin.equals(that.bin);
      }
  
      @Override
      public int hashCode()
      {
-         return Objects.hashCode(bin.hashCode(), maxBinSize);
 -        return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize);
++        return Objects.hashCode(bin.hashCode(), spool.hashCode(), maxBinSize, maxSpoolSize);
      }
  
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/942b83ca/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index 94aac9e,21c736e..f64cbd9
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@@ -33,7 -32,7 +33,7 @@@ public class StreamingHistogramTes
      @Test
      public void testFunction() throws Exception
      {
-         StreamingHistogram hist = new StreamingHistogram(5);
 -        StreamingHistogram hist = new StreamingHistogram(5, 5, 1);
++        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
          long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45};
  
          // add 7 points to histogram of 5 bins
@@@ -59,7 -58,7 +59,7 @@@
          }
  
          // merge test
-         StreamingHistogram hist2 = new StreamingHistogram(3);
 -        StreamingHistogram hist2 = new StreamingHistogram(3, 0, 1);
++        StreamingHistogram hist2 = new StreamingHistogram(3, 3, 1);
          for (int i = 7; i < samples.length; i++)
          {
              hist2.update(samples[i]);
@@@ -121,32 -120,36 +121,64 @@@
          }
      }
  
 +
 +    @Test
 +    public void testNumericTypes() throws Exception
 +    {
-         StreamingHistogram hist = new StreamingHistogram(5);
++        StreamingHistogram hist = new StreamingHistogram(5, 0, 1);
 +
 +        hist.update(2);
 +        hist.update(2.0);
 +        hist.update(2L);
 +
 +        Map<Number, long[]> asMap = hist.getAsMap();
 +
 +        assertEquals(1, asMap.size());
 +        assertEquals(3L, asMap.get(2)[0]);
 +
 +        //Make sure it's working with Serde
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        StreamingHistogram.serializer.serialize(hist, out);
 +        byte[] bytes = out.toByteArray();
 +
 +        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new
DataInputBuffer(bytes));
 +
 +        deserialized.update(2L);
 +
 +        asMap = deserialized.getAsMap();
 +        assertEquals(1, asMap.size());
 +        assertEquals(4L, asMap.get(2)[0]);
 +    }
++
+     @Test
+     public void testOverflow() throws Exception
+     {
+         StreamingHistogram hist = new StreamingHistogram(5, 10, 1);
+         long[] samples = new long[]{23, 19, 10, 16, 36, 2, 9, 32, 30, 45, 31,
 -                32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
 -                32, 32, 33, 34, 35, 70, 78, 80, 90, 100
 -        };
++                                    32, 32, 33, 34, 35, 70, 78, 80, 90, 100,
++                                    32, 32, 33, 34, 35, 70, 78, 80, 90, 100
++                                    };
+ 
+         // Hit the spool cap, force it to make bins
+         for (int i = 0; i < samples.length; i++)
+         {
+             hist.update(samples[i]);
+         }
++
+         assertEquals(5, hist.getAsMap().keySet().size());
+ 
+     }
+ 
+     @Test
+     public void testRounding() throws Exception
+     {
+         StreamingHistogram hist = new StreamingHistogram(5, 10, 60);
+         long[] samples = new long[] { 59, 60, 119, 180, 181, 300 }; // 60, 60, 120, 180,
240, 300
+         for (int i = 0 ; i < samples.length ; i++)
+             hist.update(samples[i]);
 -
 -        assertEquals(hist.getAsMap().keySet().size(), (int) 5);
 -        assertEquals((long) hist.getAsMap().get(Double.valueOf(60)), (long) 2L);
 -        assertEquals((long) hist.getAsMap().get(Double.valueOf(120)), (long) 1L);
++        assertEquals(hist.getAsMap().keySet().size(), 5);
++        assertEquals(hist.getAsMap().get(60)[0], 2);
++        assertEquals(hist.getAsMap().get(120)[0], 1);
+ 
+     }
 -
  }


Mime
View raw message