cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [1/3] cassandra git commit: Add tooling to detect hot partitions
Date Wed, 21 Jan 2015 17:49:14 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 1435b9a87 -> faf91818b
  refs/heads/trunk 184bb65fc -> 0a09b87dc


Add tooling to detect hot partitions

Patch by Chris Lohfink, reviewed by brandonwilliams for CASSANDRA-7974


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/faf91818
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/faf91818
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/faf91818

Branch: refs/heads/cassandra-2.1
Commit: faf91818b46fb51ed576664a1119315e7b7c3383
Parents: 1435b9a
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Wed Jan 21 11:45:45 2015 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Wed Jan 21 11:45:45 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  69 ++++++++-
 .../cassandra/db/ColumnFamilyStoreMBean.java    |  14 ++
 .../cassandra/metrics/ColumnFamilyMetrics.java  |  19 ++-
 .../org/apache/cassandra/tools/NodeProbe.java   |  26 +++-
 .../org/apache/cassandra/tools/NodeTool.java    |  83 ++++++++++-
 .../org/apache/cassandra/utils/TopKSampler.java | 139 ++++++++++++++++++
 .../apache/cassandra/utils/TopKSamplerTest.java | 147 +++++++++++++++++++
 8 files changed, 482 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c2bab8..f1eaa77 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Add tooling to detect hot partitions (CASSANDRA-7974)
  * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
  * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
  * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f7a691e..0c95b0e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -26,14 +26,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import javax.management.*;
+import javax.management.openmbean.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
+import com.google.common.base.*;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.json.simple.*;
 import org.slf4j.Logger;
@@ -68,14 +66,18 @@ import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 
+import com.clearspring.analytics.stream.Counter;
+
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -102,6 +104,39 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                                         
   new NamedThreadFactory("MemtableReclaimMemory"),
                                                                                         
   "internal");
 
+    private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"};
+    private static final String[] COUNTER_DESCS = new String[]
+    { "partition key in raw hex bytes",
+      "value of this partition for given sampler",
+      "value is within the error bounds plus or minus of this",
+      "the partition key turned into a human readable format" };
+    private static final CompositeType COUNTER_COMPOSITE_TYPE;
+    private static final TabularType COUNTER_TYPE;
+
+    private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"};
+    private static final String[] SAMPLER_DESCS = new String[]
+    { "cardinality of partitions",
+      "list of counter results" };
+
+    private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
+    private static final CompositeType SAMPLING_RESULT;
+
+    static
+    {
+        try
+        {
+            OpenType<?>[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG,
SimpleType.LONG, SimpleType.STRING };
+            COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME,
COUNTER_NAMES, COUNTER_DESCS, counterTypes);
+            COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME,
COUNTER_COMPOSITE_TYPE, COUNTER_NAMES);
+
+            OpenType<?>[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE
};
+            SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME,
SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes);
+        } catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
     public final Keyspace keyspace;
     public final String name;
     public final CFMetaData metadata;
@@ -1152,6 +1187,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Memtable mt = data.getMemtableFor(opGroup, replayPosition);
         final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
         maybeUpdateRowCache(key);
+        metric.samplers.get(Sampler.WRITES).addSample(key.getKey());
         metric.writeLatency.addNano(System.nanoTime() - start);
         if(timeDelta < Long.MAX_VALUE)
             metric.colUpdateTimeDeltaHistogram.update(timeDelta);
@@ -1915,10 +1951,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
         }
+        if (columns != null)
+            metric.samplers.get(Sampler.READS).addSample(filter.key.getKey());
         metric.updateSSTableIterated(controller.getSstablesIterated());
         return columns;
     }
 
+    public void beginLocalSampling(String sampler, int capacity)
+    {
+        metric.samplers.get(Sampler.valueOf(sampler)).beginSampling(capacity);
+    }
+
+    public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException
+    {
+        SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler))
+                .finishSampling(count);
+        TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
+        for (Counter<ByteBuffer> counter : samplerResults.topK)
+        {
+            byte[] key = counter.getItem().array();
+            result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new
Object[] {
+                    Hex.bytesToHex(key), // raw
+                    counter.getCount(),  // count
+                    counter.getError(),  // error
+                    metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
+        }
+        return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{
+                samplerResults.cardinality, result});
+    }
+
     public void cleanupCache()
     {
         Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 3418b26..4df593b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -21,6 +21,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
 /**
  * The MBean interface for ColumnFamilyStore
  */
@@ -402,4 +405,15 @@ public interface ColumnFamilyStoreMBean
      * @return the size of SSTables in "snapshots" subdirectory which aren't live anymore
      */
     public long trueSnapshotsSize();
+
+    /**
+     * begin sampling for a specific sampler with a given capacity.  The cardinality may
+     * be larger than the capacity, but depending on the use case it may affect its accuracy
+     */
+    public void beginLocalSampling(String sampler, int capacity);
+
+    /**
+     * @return top <i>count</i> items for the sampler since beginLocalSampling
was called
+     */
+    public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index b906750..c82569d 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -17,9 +17,8 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.nio.ByteBuffer;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
@@ -28,11 +27,13 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.TopKSampler;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.*;
+import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.util.RatioGauge;
 
 /**
@@ -144,6 +145,7 @@ public class ColumnFamilyMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalNameFactory,
"Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalNameFactory,
"Range");
     
+    public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
      * stores metrics that will be rolled into a single global metric
      */
@@ -203,6 +205,12 @@ public class ColumnFamilyMetrics
     {
         factory = new ColumnFamilyMetricNameFactory(cfs);
 
+        samplers = Maps.newHashMap();
+        for (Sampler sampler : Sampler.values())
+        {
+            samplers.put(sampler, new TopKSampler<ByteBuffer>());
+        }
+
         memtableColumnsCount = createColumnFamilyGauge("MemtableColumnsCount", new Gauge<Long>()
         {
             public Long value()
@@ -766,4 +774,9 @@ public class ColumnFamilyMetrics
             return new MetricName(groupName, "ColumnFamily", metricName, "all", mbeanName.toString());
         }
     }
+
+    public static enum Sampler
+    {
+        READS, WRITES
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 00f9686..67cc7f1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -28,8 +28,7 @@ import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.Condition;
 import javax.management.*;
 import javax.management.openmbean.CompositeData;
@@ -37,11 +36,11 @@ import javax.management.remote.JMXConnectionNotification;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
-import javax.management.openmbean.TabularData;
+import javax.management.openmbean.*;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import com.yammer.metrics.reporting.JmxReporter;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
@@ -53,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.FailureDetectorMBean;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.repair.RepairParallelism;
@@ -312,6 +312,22 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
+    public Map<Sampler, CompositeData> getPartitionSample(String ks, String cf, int
capacity, int duration, int count, List<Sampler> samplers) throws OpenDataException
+    {
+        ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf);
+        for(Sampler sampler : samplers)
+        {
+            cfsProxy.beginLocalSampling(sampler.name(), capacity);
+        }
+        Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS);
+        Map<Sampler, CompositeData> result = Maps.newHashMap();
+        for(Sampler sampler : samplers)
+        {
+            result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), count));
+        }
+        return result;
+    }
+
     public void invalidateCounterCache()
     {
         cacheService.invalidateCounterCache();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 2cc0b98..12496fc 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -27,13 +27,11 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
-import javax.management.openmbean.TabularData;
+import javax.management.openmbean.*;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
 import com.yammer.metrics.reporting.JmxReporter;
 
 import io.airlift.command.*;
@@ -47,6 +45,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.service.CacheServiceMBean;
@@ -146,6 +145,7 @@ public class NodeTool
                 Drain.class,
                 TruncateHints.class,
                 TpStats.class,
+                TopPartitions.class,
                 SetLoggingLevel.class,
                 GetLoggingLevels.class
         );
@@ -925,6 +925,81 @@ public class NodeTool
         }
     }
 
+    @Command(name = "toppartitions", description = "Sample and print the most active partitions
for a given column family")
+    public static class TopPartitions extends NodeToolCmd
+    {
+        @Arguments(usage = "<keyspace> <cfname> <duration>", description
= "The keyspace, column family name, and duration in milliseconds")
+        private List<String> args = new ArrayList<>();
+        @Option(name = "-s", description = "Capacity of stream summary, closer to the actual
cardinality of partitions will yield more accurate results (Default: 256)")
+        private int size = 256;
+        @Option(name = "-k", description = "Number of the top partitions to list (Default:
10)")
+        private int topCount = 10;
+        @Option(name = "-a", description = "Comma separated list of samplers to use (Default:
all)")
+        private String samplers = join(Sampler.values(), ',');
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            checkArgument(args.size() == 3, "toppartitions requires keyspace, column family
name, and duration");
+            checkArgument(topCount < size, "TopK count (-k) option must be smaller then
the summary capacity (-s)");
+            String keyspace = args.get(0);
+            String cfname = args.get(1);
+            Integer duration = Integer.parseInt(args.get(2));
+            // generate the list of samplers
+            List<Sampler> targets = Lists.newArrayList();
+            for (String s : samplers.split(","))
+            {
+                try
+                {
+                    targets.add(Sampler.valueOf(s.toUpperCase()));
+                } catch (Exception e)
+                {
+                    throw new IllegalArgumentException(s + " is not a valid sampler, choose
one of: " + join(Sampler.values(), ", "));
+                }
+            }
+
+            Map<Sampler, CompositeData> results;
+            try
+            {
+                results = probe.getPartitionSample(keyspace, cfname, size, duration, topCount,
targets);
+            } catch (OpenDataException e)
+            {
+                throw new RuntimeException(e);
+            }
+            boolean first = true;
+            for(Entry<Sampler, CompositeData> result : results.entrySet())
+            {
+                CompositeData sampling = result.getValue();
+                // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436
+                List<CompositeData> topk = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport)
sampling.get("partitions")).values());
+                Collections.sort(topk, new Ordering<CompositeData>()
+                {
+                    public int compare(CompositeData left, CompositeData right)
+                    {
+                        return Long.compare((long) right.get("count"), (long) left.get("count"));
+                    }
+                });
+                if(!first)
+                    System.out.println();
+                System.out.println(result.getKey().toString()+ " Sampler:");
+                System.out.printf("  Cardinality: ~%d (%d capacity)%n", (long) sampling.get("cardinality"),
size);
+                System.out.printf("  Top %d partitions:%n", topCount);
+                if (topk.size() == 0)
+                {
+                    System.out.println("\tNothing recorded during sampling period...");
+                } else
+                {
+                    int offset = 0;
+                    for (CompositeData entry : topk)
+                        offset = Math.max(offset, entry.get("string").toString().length());
+                    System.out.printf("\t%-" + offset + "s%10s%10s%n", "Partition", "Count",
"+/-");
+                    for (CompositeData entry : topk)
+                        System.out.printf("\t%-" + offset + "s%10d%10d%n", entry.get("string").toString(),
entry.get("count"), entry.get("error"));
+                }
+                first = false;
+            }
+        }
+    }
+
     @Command(name = "cfhistograms", description = "Print statistic histograms for a given
column family")
     public static class CfHistograms extends NodeToolCmd
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/utils/TopKSampler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TopKSampler.java b/src/java/org/apache/cassandra/utils/TopKSampler.java
new file mode 100644
index 0000000..29d46286
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TopKSampler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.*;
+import org.slf4j.*;
+
+import com.clearspring.analytics.stream.*;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.common.annotations.VisibleForTesting;
+
+public class TopKSampler<T>
+{
+    private static final Logger logger = LoggerFactory.getLogger(TopKSampler.class);
+    private volatile boolean enabled = false;
+
+    @VisibleForTesting
+    static final ThreadPoolExecutor samplerExecutor = new JMXEnabledThreadPoolExecutor(1,
1,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("Sampler"),
+            "internal");
+
+    private StreamSummary<T> summary;
+    @VisibleForTesting
+    HyperLogLogPlus hll;
+
+    /**
+     * Start to record samples
+     *
+     * @param capacity
+     *            Number of sample items to keep in memory, the lower this is
+     *            the less accurate results are. For best results use value
+     *            close to cardinality, but understand the memory trade offs.
+     */
+    public synchronized void beginSampling(int capacity)
+    {
+        if (!enabled)
+        {
+            summary = new StreamSummary<T>(capacity);
+            hll = new HyperLogLogPlus(14);
+            enabled = true;
+        }
+    }
+
+    /**
+     * Call to stop collecting samples, and gather the results
+     * @param count Number of most frequent items to return
+     */
+    public synchronized SamplerResult<T> finishSampling(int count)
+    {
+        List<Counter<T>> results = Collections.EMPTY_LIST;
+        long cardinality = 0;
+        if (enabled)
+        {
+            enabled = false;
+            results = summary.topK(count);
+            cardinality = hll.cardinality();
+        }
+        return new SamplerResult<T>(results, cardinality);
+    }
+
+    public void addSample(T item)
+    {
+        addSample(item, 1);
+    }
+
+    /**
+     * Adds a sample to statistics collection. This method is non-blocking and will
+     * use the "Sampler" thread pool to record results if the sampler is enabled.  If not
+     * sampling this is a NOOP
+     */
+    public void addSample(final T item, final int value)
+    {
+        if (enabled)
+        {
+            final Object lock = this;
+            samplerExecutor.execute(new Runnable()
+            {
+                public void run()
+                {
+                    // samplerExecutor is single threaded but still need
+                    // synchronization against jmx calls to finishSampling
+                    synchronized (lock)
+                    {
+                        if (enabled)
+                        {
+                            try
+                            {
+                                summary.offer(item, value);
+                                hll.offer(item);
+                            } catch (Exception e)
+                            {
+                                logger.debug("Failure to offer sample", e);
+                            }
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * Represents the cardinality and the topK ranked items collected during a
+     * sample period
+     */
+    public static class SamplerResult<S> implements Serializable
+    {
+        public final List<Counter<S>> topK;
+        public final long cardinality;
+
+        public SamplerResult(List<Counter<S>> topK, long cardinality)
+        {
+            this.topK = topK;
+            this.cardinality = cardinality;
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
new file mode 100644
index 0000000..dc3b91c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
@@ -0,0 +1,147 @@
+package org.apache.cassandra.utils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.utils.TopKSampler.SamplerResult;
+import org.junit.Test;
+
+import com.clearspring.analytics.stream.Counter;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class TopKSamplerTest
+{
+
+    @Test
+    public void testSamplerSingleInsertionsEqualMulti() throws TimeoutException
+    {
+        TopKSampler<String> sampler = new TopKSampler<String>();
+        sampler.beginSampling(10);
+        insert(sampler);
+        waitForEmpty(1000);
+        SamplerResult single = sampler.finishSampling(10);
+
+        TopKSampler<String> sampler2 = new TopKSampler<String>();
+        sampler2.beginSampling(10);
+        for(int i = 1; i <= 10; i++)
+        {
+           sampler2.addSample("item" + i, i);
+        }
+        waitForEmpty(1000);
+        Assert.assertEquals(countMap(single.topK), countMap(sampler2.finishSampling(10).topK));
+        Assert.assertEquals(sampler.hll.cardinality(), sampler2.hll.cardinality());
+    }
+
+    @Test
+    public void testSamplerOutOfOrder() throws TimeoutException
+    {
+        TopKSampler<String> sampler = new TopKSampler<String>();
+        sampler.beginSampling(10);
+        insert(sampler);
+        waitForEmpty(1000);
+        SamplerResult single = sampler.finishSampling(10);
+        single = sampler.finishSampling(10);
+    }
+
+    /**
+     * checking for exceptions from SS/HLL which are not thread safe
+     */
+    @Test
+    public void testMultithreadedAccess() throws Exception
+    {
+        final AtomicBoolean running = new AtomicBoolean(true);
+        final CountDownLatch latch = new CountDownLatch(1);
+        final TopKSampler<String> sampler = new TopKSampler<String>();
+
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    while (running.get())
+                    {
+                        insert(sampler);
+                    }
+                } finally
+                {
+                    latch.countDown();
+                }
+            }
+
+        }
+        ,"inserter").start();
+        try
+        {
+            // start/stop in fast iterations
+            for(int i = 0; i<100; i++)
+            {
+                sampler.beginSampling(i);
+                sampler.finishSampling(i);
+            }
+            // start/stop with pause to let it build up past capacity
+            for(int i = 0; i<3; i++)
+            {
+                sampler.beginSampling(i);
+                Thread.sleep(250);
+                sampler.finishSampling(i);
+            }
+
+            // with empty results
+            running.set(false);
+            latch.await(1, TimeUnit.SECONDS);
+            waitForEmpty(1000);
+            for(int i = 0; i<10; i++)
+            {
+                sampler.beginSampling(i);
+                Thread.sleep(i);
+                sampler.finishSampling(i);
+            }
+        } finally
+        {
+            running.set(false);
+        }
+    }
+
+    private void insert(TopKSampler<String> sampler)
+    {
+        for(int i = 1; i <= 10; i++)
+        {
+            for(int j = 0; j < i; j++)
+            {
+                sampler.addSample("item" + i);
+            }
+        }
+    }
+
+    private void waitForEmpty(int timeoutMs) throws TimeoutException
+    {
+        int timeout = 0;
+        while (!TopKSampler.samplerExecutor.getQueue().isEmpty())
+        {
+            timeout++;
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            if (timeout * 100 > timeoutMs)
+            {
+                throw new TimeoutException("TRACE executor not cleared within timeout");
+            }
+        }
+    }
+
+    private <T> Map<T, Long> countMap(List<Counter<T>> target)
+    {
+        Map<T, Long> counts = Maps.newHashMap();
+        for(Counter<T> counter : target)
+        {
+            counts.put(counter.getItem(), counter.getCount());
+        }
+        return counts;
+    }
+}


Mime
View raw message