cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/3] New metrics; patch by yukim reviewed by brandonwilliams for CASSANDRA-4009
Date Thu, 30 Aug 2012 12:48:36 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
new file mode 100644
index 0000000..1a93022
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
+/**
+ * Metrics for {@link OutboundTcpConnectionPool}.
+ */
+public class ConnectionMetrics
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "Connection";
+
+    /** Total number of timeouts happened on this node */
+    public static final Meter totalTimeouts = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalTimeouts"), "total timeouts", TimeUnit.SECONDS);
+    private static long recentTimeouts;
+
+    public final String address;
+    /** Pending tasks for Command(Mutations, Read etc) TCP Connections */
+    public final Gauge<Integer> commandPendingTasks;
+    /** Completed tasks for Command(Mutations, Read etc) TCP Connections */
+    public final Gauge<Long> commandCompletedTasks;
+    /** Dropped tasks for Command(Mutations, Read etc) TCP Connections */
+    public final Gauge<Long> commandDroppedTasks;
+    /** Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections */
+    public final Gauge<Integer> responsePendingTasks;
+    /** Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections */
+    public final Gauge<Long> responseCompletedTasks;
+    /** Number of timeouts for specific IP */
+    public final Meter timeouts;
+
+    private long recentTimeoutCount;
+
+    /**
+     * Create metrics for given connection pool.
+     *
+     * @param ip IP address to use for metrics label
+     * @param connectionPool Connection pool
+     */
+    public ConnectionMetrics(InetAddress ip, final OutboundTcpConnectionPool connectionPool)
+    {
+        address = ip.getHostAddress();
+        commandPendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandPendingTasks", address), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return connectionPool.cmdCon.getPendingMessages();
+            }
+        });
+        commandCompletedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandCompletedTasks", address), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return connectionPool.cmdCon.getCompletedMesssages();
+            }
+        });
+        commandDroppedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandDroppedTasks", address), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return connectionPool.cmdCon.getDroppedMessages();
+            }
+        });
+        responsePendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "ResponsePendingTasks", address), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return connectionPool.ackCon.getPendingMessages();
+            }
+        });
+        responseCompletedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "ResponseCompletedTasks", address), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return connectionPool.ackCon.getCompletedMesssages();
+            }
+        });
+        timeouts = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Timeouts", address), "timeouts", TimeUnit.SECONDS);
+    }
+
+    public void release()
+    {
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandPendingTasks", address));
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandCompletedTasks", address));
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandDroppedTasks", address));
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "ResponsePendingTasks", address));
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "ResponseCompletedTasks", address));
+        Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "Timeouts", address));
+    }
+
+    @Deprecated
+    public static long getRecentTotalTimeout()
+    {
+        long total = totalTimeouts.count();
+        long recent = total - recentTimeouts;
+        recentTimeouts = total;
+        return recent;
+    }
+
+    @Deprecated
+    public long getRecentTimeout()
+    {
+        long timeoutCount = timeouts.count();
+        long recent = timeoutCount - recentTimeoutCount;
+        recentTimeoutCount = timeoutCount;
+        return recent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
new file mode 100644
index 0000000..e0b12bb
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Metrics for dropped messages by verb.
+ */
+public class DroppedMessageMetrics
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "DroppedMessage";
+
+    /** Number of dropped messages */
+    public final Meter dropped;
+
+    private long lastDropped = 0;
+
+    public DroppedMessageMetrics(MessagingService.Verb verb)
+    {
+        dropped = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Dropped", verb.toString()), "dropped", TimeUnit.SECONDS);
+    }
+
+    @Deprecated
+    public int getRecentlyDropped()
+    {
+        long currentDropped = dropped.count();
+        long recentlyDropped = currentDropped - lastDropped;
+        lastDropped = currentDropped;
+        return (int)recentlyDropped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
new file mode 100644
index 0000000..d177613
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metrics about latencies
+ */
+public class LatencyMetrics
+{
+    /** Latency */
+    public final Timer latency;
+    /** Total latency in micro sec */
+    public final Counter totalLatency;
+
+    protected final MetricNameFactory factory;
+    protected final String namePrefix;
+
+    @Deprecated public final EstimatedHistogram totalLatencyHistogram = new EstimatedHistogram();
+    @Deprecated public final EstimatedHistogram recentLatencyHistogram = new EstimatedHistogram();
+    protected long lastLatency;
+    protected long lastOpCount;
+
+    /**
+     * Create LatencyMetrics with given group, type, and scope. Name prefix for each metric will be empty.
+     *
+     * @param group Group name
+     * @param type Type name
+     * @param scope Scope
+     */
+    public LatencyMetrics(String group, String type, String scope)
+    {
+        this(group, type, "", scope);
+    }
+
+    /**
+     * Create LatencyMetrics with given group, type, prefix to append to each metric name, and scope.
+     *
+     * @param group Group name
+     * @param type Type name
+     * @param namePrefix Prefix to append to each metric name
+     * @param scope Scope of metrics
+     */
+    public LatencyMetrics(String group, String type, String namePrefix, String scope)
+    {
+        this(new LatencyMetricNameFactory(group, type, scope), namePrefix);
+    }
+
+    /**
+     * Create LatencyMetrics with given group, type, prefix to append to each metric name, and scope.
+     *
+     * @param factory MetricName factory to use
+     * @param namePrefix Prefix to append to each metric name
+     */
+    public LatencyMetrics(MetricNameFactory factory, String namePrefix)
+    {
+        this.factory = factory;
+        this.namePrefix = namePrefix;
+
+        latency = Metrics.newTimer(factory.createMetricName(namePrefix + "Latency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
+        totalLatency = Metrics.newCounter(factory.createMetricName(namePrefix + "TotalLatency"));
+    }
+
+    /** takes nanoseconds **/
+    public void addNano(long nanos)
+    {
+        // convert to microseconds. 1 millionth
+        addMicro(nanos / 1000);
+    }
+
+    public void addMicro(long micros)
+    {
+        latency.update(micros, TimeUnit.MICROSECONDS);
+        totalLatency.inc(micros);
+        totalLatencyHistogram.add(micros);
+        recentLatencyHistogram.add(micros);
+    }
+
+    public void release()
+    {
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName(namePrefix + "Latency"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName(namePrefix + "TotalLatency"));
+    }
+
+    @Deprecated
+    public double getRecentLatency()
+    {
+        long ops = latency.count();
+        long n = totalLatency.count();
+        try
+        {
+            return ((double) n - lastLatency) / (ops - lastOpCount);
+        }
+        finally
+        {
+            lastLatency = n;
+            lastOpCount = ops;
+        }
+    }
+
+    static class LatencyMetricNameFactory implements MetricNameFactory
+    {
+        private final String group;
+        private final String type;
+        private final String scope;
+
+        LatencyMetricNameFactory(String group, String type, String scope)
+        {
+            this.group = group;
+            this.type = type;
+            this.scope = scope;
+        }
+
+        public MetricName createMetricName(String metricName)
+        {
+            return new MetricName(group, type, metricName, scope);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MetricNameFactory.java b/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
new file mode 100644
index 0000000..5c1a5c2
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.core.MetricName;
+
+public interface MetricNameFactory
+{
+    /**
+     * Create {@link MetricName} from given metric name.
+     *
+     * @param metricName Name part of {@link MetricName}.
+     * @return new MetricName with given metric name.
+     */
+    MetricName createMetricName(String metricName);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/StorageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
new file mode 100644
index 0000000..3cda71e
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+
+/**
+ * Metrics related to Storage.
+ */
+public class StorageMetrics
+{
+    public static final Counter load = Metrics.newCounter(new MetricName("org.apache.cassandra.metrics", "Storage", "Load"));
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
new file mode 100644
index 0000000..05aad31
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentMap;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Metrics for streaming.
+ */
+public class StreamingMetrics
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "Streaming";
+
+    private static final ConcurrentMap<InetAddress, StreamingMetrics> instances = new NonBlockingHashMap<InetAddress, StreamingMetrics>();
+
+    public static final Counter activeStreamsOutbound = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "ActiveOutboundStreams"));
+    public static final Counter totalIncomingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalIncomingBytes"));
+    public static final Counter totalOutgoingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalOutgoingBytes"));
+    public final Counter incomingBytes;
+    public final Counter outgoingBytes;
+
+    public static StreamingMetrics get(InetAddress ip)
+    {
+       StreamingMetrics metrics = instances.get(ip);
+       if (metrics == null)
+       {
+           metrics = new StreamingMetrics(ip);
+           instances.put(ip, metrics);
+       }
+       return metrics;
+    }
+
+    public StreamingMetrics(final InetAddress peer)
+    {
+        incomingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "IncomingBytes", peer.getHostAddress()));
+        outgoingBytes= Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "OutgoingBytes", peer.getHostAddress()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
new file mode 100644
index 0000000..af54cdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.*;
+
+/**
+ * Metrics for {@link ThreadPoolExecutor}.
+ */
+public class ThreadPoolMetrics
+{
+    /** Number of active tasks. */
+    public final Gauge<Integer> activeTasks;
+    /** Number of tasks that had blocked before being accepted (or rejected). */
+    public final Counter totalBlocked;
+    /**
+     * Number of tasks currently blocked, waiting to be accepted by
+     * the executor (because all threads are busy and the backing queue is full).
+     */
+    public final Counter currentBlocked;
+    /** Number of completed tasks. */
+    public final Gauge<Long> completedTasks;
+    /** Number of tasks waiting to be executed. */
+    public final Gauge<Long> pendingTasks;
+
+    private MetricNameFactory factory;
+
+    /**
+     * Create metrics for given ThreadPoolExecutor.
+     *
+     * @param executor Thread pool
+     * @param path Type of thread pool
+     * @param poolName Name of thread pool to identify metrics
+     */
+    public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName)
+    {
+        this.factory = new ThreadPoolMetricNameFactory(path, poolName);
+
+        activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return executor.getActiveCount();
+            }
+        });
+        totalBlocked = Metrics.newCounter(factory.createMetricName("TotalBlockedTasks"));
+        currentBlocked = Metrics.newCounter(factory.createMetricName("CurrentlyBlockedTasks"));
+        completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getCompletedTaskCount();
+            }
+        });
+        pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getTaskCount() - executor.getCompletedTaskCount();
+            }
+        });
+    }
+
+    public void release()
+    {
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("ActiveTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompletedTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
+    }
+
+    class ThreadPoolMetricNameFactory implements MetricNameFactory
+    {
+        private final String path;
+        private final String poolName;
+
+        ThreadPoolMetricNameFactory(String path, String poolName)
+        {
+            this.path = path;
+            this.poolName = poolName;
+        }
+
+        public MetricName createMetricName(String metricName)
+        {
+            String groupName = ThreadPoolMetrics.class.getPackage().getName();
+            String type = "ThreadPools";
+            StringBuilder mbeanName = new StringBuilder();
+            mbeanName.append(groupName).append(":");
+            mbeanName.append("type=").append(type);
+            mbeanName.append(",path=").append(path);
+            mbeanName.append(",scope=").append(poolName);
+            mbeanName.append(",name=").append(metricName);
+
+            return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9d95fad..c4ceae2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -31,15 +31,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-
-import org.apache.cassandra.tracing.Tracing;
-
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +54,8 @@ import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.metrics.ConnectionMetrics;
+import org.apache.cassandra.metrics.DroppedMessageMetrics;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.AntiEntropyService;
@@ -65,9 +64,8 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -260,7 +258,6 @@ public final class MessagingService implements MessagingServiceMBean
      * is not going to be a thread per node - but rather an instance per node. That's totally fine.
      */
     private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap<InetAddress, DebuggableThreadPoolExecutor>();
-    private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0);
 
     private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
 
@@ -283,15 +280,10 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.REQUEST_RESPONSE);
 
     // total dropped message counts for server lifetime
-    private final Map<Verb, AtomicInteger> droppedMessages = new EnumMap<Verb, AtomicInteger>(Verb.class);
+    private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb, DroppedMessageMetrics>(Verb.class);
     // dropped count when last requested for the Recent api.  high concurrency isn't necessary here.
-    private final Map<Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap<Verb, Integer>(Verb.class));
     private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
 
-    private long totalTimeouts = 0;
-    private long recentTotalTimeouts = 0;
-    private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
-    private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 
     // protocol versions of the other nodes in the cluster
@@ -310,8 +302,7 @@ public final class MessagingService implements MessagingServiceMBean
     {
         for (Verb verb : DROPPABLE_VERBS)
         {
-            droppedMessages.put(verb, new AtomicInteger());
-            lastDropped.put(verb, 0);
+            droppedMessages.put(verb, new DroppedMessageMetrics(verb));
             lastDroppedInternal.put(verb, 0);
         }
 
@@ -332,19 +323,8 @@ public final class MessagingService implements MessagingServiceMBean
             {
                 CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
-                totalTimeouts++;
-                String ip = expiredCallbackInfo.target.getHostAddress();
-                AtomicLong c = timeoutsPerHost.get(ip);
-                if (c == null)
-                {
-                    c = new AtomicLong();
-                    timeoutsPerHost.put(ip, c);
-                }
-                c.incrementAndGet();
-                // we only create AtomicLong instances here, so that the write
-                // access to the hashmap happens single-threadedly.
-                if (recentTimeoutsPerHost.get(ip) == null)
-                    recentTimeoutsPerHost.put(ip, new AtomicLong());
+                ConnectionMetrics.totalTimeouts.mark();
+                getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
 
                 if (expiredCallbackInfo.shouldHint())
                 {
@@ -627,24 +607,6 @@ public final class MessagingService implements MessagingServiceMBean
                          : new CompressedFileStreamTask(header, to));
     }
 
-    public void incrementActiveStreamsOutbound()
-    {
-        activeStreamsOutbound.incrementAndGet();
-    }
-
-    public void decrementActiveStreamsOutbound()
-    {
-        activeStreamsOutbound.decrementAndGet();
-    }
-
-    /**
-     * The count of active outbound stream tasks.
-     */
-    public int getActiveStreamsOutbound()
-    {
-        return activeStreamsOutbound.get();
-    }
-
     public void register(ILatencySubscriber subcriber)
     {
         subscribers.add(subcriber);
@@ -821,23 +783,23 @@ public final class MessagingService implements MessagingServiceMBean
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
-        droppedMessages.get(verb).incrementAndGet();
+        droppedMessages.get(verb).dropped.mark();
     }
 
     private void logDroppedMessages()
     {
         boolean logTpstats = false;
-        for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
+        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
         {
-            AtomicInteger dropped = entry.getValue();
+            int dropped = (int) entry.getValue().dropped.count();
             Verb verb = entry.getKey();
-            int recent = dropped.get() - lastDroppedInternal.get(verb);
+            int recent = dropped - lastDroppedInternal.get(verb);
             if (recent > 0)
             {
                 logTpstats = true;
                 logger.info("{} {} messages dropped in last {}ms",
-                            new Object[]{ recent, verb, LOG_DROPPED_INTERVAL_IN_MS });
-                lastDroppedInternal.put(verb, dropped.get());
+                             new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
+                lastDroppedInternal.put(verb, dropped);
             }
         }
 
@@ -932,43 +894,37 @@ public final class MessagingService implements MessagingServiceMBean
     public Map<String, Integer> getDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, Integer>();
-        for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
-            map.put(entry.getKey().toString(), entry.getValue().get());
+        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+            map.put(entry.getKey().toString(), (int) entry.getValue().dropped.count());
         return map;
     }
 
     public Map<String, Integer> getRecentlyDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, Integer>();
-        for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
-        {
-            Verb verb = entry.getKey();
-            Integer dropped = entry.getValue().get();
-            Integer recentlyDropped = dropped - lastDropped.get(verb);
-            map.put(verb.toString(), recentlyDropped);
-            lastDropped.put(verb, dropped);
-        }
+        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+            map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped());
         return map;
     }
 
     public long getTotalTimeouts()
     {
-        return totalTimeouts;
+        return ConnectionMetrics.totalTimeouts.count();
     }
 
     public long getRecentTotalTimouts()
     {
-        long recent = totalTimeouts - recentTotalTimeouts;
-        recentTotalTimeouts = totalTimeouts;
-        return recent;
+        return ConnectionMetrics.getRecentTotalTimeout();
     }
 
     public Map<String, Long> getTimeoutsPerHost()
     {
         Map<String, Long> result = new HashMap<String, Long>();
-        for (Map.Entry<String, AtomicLong> entry : timeoutsPerHost.entrySet())
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
         {
-            result.put(entry.getKey(), entry.getValue().get());
+            String ip = entry.getKey().getHostAddress();
+            long recent = entry.getValue().getTimeouts();
+            result.put(ip, recent);
         }
         return result;
     }
@@ -976,12 +932,11 @@ public final class MessagingService implements MessagingServiceMBean
     public Map<String, Long> getRecentTimeoutsPerHost()
     {
         Map<String, Long> result = new HashMap<String, Long>();
-        for (Map.Entry<String, AtomicLong> entry : recentTimeoutsPerHost.entrySet())
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
         {
-            String ip = entry.getKey();
-            AtomicLong recent = entry.getValue();
-            Long timeout = timeoutsPerHost.get(ip).get();
-            result.put(ip, timeout - recent.getAndSet(timeout));
+            String ip = entry.getKey().getHostAddress();
+            long recent = entry.getValue().getRecentTimeouts();
+            result.put(ip, recent);
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index ce51cf0..05a39a1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -39,6 +40,7 @@ public class OutboundTcpConnectionPool
     public final OutboundTcpConnection ackCon;
     // pointer to the reseted Address.
     private InetAddress resetedEndpoint;
+    private ConnectionMetrics metrics;
 
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {
@@ -47,6 +49,8 @@ public class OutboundTcpConnectionPool
         cmdCon.start();
         ackCon = new OutboundTcpConnection(this);
         ackCon.start();
+
+        metrics = new ConnectionMetrics(id, this);
     }
 
     /**
@@ -86,6 +90,25 @@ public class OutboundTcpConnectionPool
         resetedEndpoint = remoteEP;
         for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon })
             conn.softCloseSocket();
+
+        // release previous metrics and create new one with reset address
+        metrics.release();
+        metrics = new ConnectionMetrics(resetedEndpoint, this);
+    }
+
+    public long getTimeouts()
+    {
+       return metrics.timeouts.count();
+    }
+
+    public long getRecentTimeouts()
+    {
+        return metrics.getRecentTimeout();
+    }
+
+    public void incrementTimeout()
+    {
+        metrics.timeouts.mark();
     }
 
     public Socket newSocket() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
index a7d9919..2d2e0bd 100644
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
+++ b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.scheduler;
 
-
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
@@ -25,11 +24,11 @@ import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.metrics.LatencyMetrics;
 
 class WeightedQueue implements WeightedQueueMBean
 {
-    private final LatencyTracker stats = new LatencyTracker();
+    private final LatencyMetrics metric;
 
     public final String key;
     public final int weight;
@@ -39,6 +38,7 @@ class WeightedQueue implements WeightedQueueMBean
         this.key = key;
         this.weight = weight;
         this.queue = new SynchronousQueue<Entry>(true);
+        this.metric =  new LatencyMetrics("org.apache.cassandra.metrics", "scheduler", "WeightedQueue", key);
     }
 
     public void register()
@@ -66,7 +66,7 @@ class WeightedQueue implements WeightedQueueMBean
         Entry e = queue.poll();
         if (e == null)
             return null;
-        stats.addNano(System.nanoTime() - e.creationTime);
+        metric.addNano(System.nanoTime() - e.creationTime);
         return e.thread;
     }
 
@@ -90,26 +90,26 @@ class WeightedQueue implements WeightedQueueMBean
 
     public long getOperations()
     {
-        return stats.getOpCount();
+        return metric.latency.count();
     }
 
     public long getTotalLatencyMicros()
     {
-        return stats.getTotalLatencyMicros();
+        return metric.totalLatency.count();
     }
 
     public double getRecentLatencyMicros()
     {
-        return stats.getRecentLatencyMicros();
+        return metric.getRecentLatency();
     }
 
     public long[] getTotalLatencyHistogramMicros()
     {
-        return stats.getTotalLatencyHistogramMicros();
+        return metric.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentLatencyHistogramMicros()
     {
-        return stats.getRecentLatencyHistogramMicros();
+        return metric.recentLatencyHistogram.getBuckets(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java b/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
index 4a253d1..d16d007 100644
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
+++ b/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.scheduler;
 
 /**
  * Exposes client request scheduling metrics for a particular scheduler queue.
+ * @see org.apache.cassandra.metrics.LatencyMetrics
  */
+@Deprecated
 public interface WeightedQueueMBean
 {
     public long getOperations();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 25a38ef..0ed8002 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -29,11 +29,14 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.util.concurrent.Futures;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
+import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
@@ -52,12 +55,6 @@ import org.apache.cassandra.io.sstable.SSTableReader.Operator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.github.jamm.MemoryMeter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
 
 public class CacheService implements CacheServiceMBean
 {
@@ -125,7 +122,7 @@ public class CacheService implements CacheServiceMBean
         else
         {
             logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); KeyCache size in JVM Heap will not be calculated accurately. " +
-            		"Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
+                        "Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
             /* We don't know the overhead size because memory meter is not enabled. */
             EntryWeigher<KeyCacheKey, RowIndexEntry> weigher = new EntryWeigher<KeyCacheKey, RowIndexEntry>()
             {
@@ -177,32 +174,32 @@ public class CacheService implements CacheServiceMBean
 
     public long getKeyCacheHits()
     {
-        return keyCache.getHits();
+        return keyCache.getMetrics().hits.count();
     }
 
     public long getRowCacheHits()
     {
-        return rowCache.getHits();
+        return rowCache.getMetrics().hits.count();
     }
 
     public long getKeyCacheRequests()
     {
-        return keyCache.getRequests();
+        return keyCache.getMetrics().requests.count();
     }
 
     public long getRowCacheRequests()
     {
-        return rowCache.getRequests();
+        return rowCache.getMetrics().requests.count();
     }
 
     public double getKeyCacheRecentHitRate()
     {
-        return keyCache.getRecentHitRate();
+        return keyCache.getMetrics().getRecentHitRate();
     }
 
     public double getRowCacheRecentHitRate()
     {
-        return rowCache.getRecentHitRate();
+        return rowCache.getMetrics().getRecentHitRate();
     }
 
     public int getRowCacheSavePeriodInSeconds()
@@ -245,7 +242,7 @@ public class CacheService implements CacheServiceMBean
 
     public long getRowCacheCapacityInBytes()
     {
-        return rowCache.getCapacity();
+        return rowCache.getMetrics().capacityInBytes.value();
     }
 
     public long getRowCacheCapacityInMB()
@@ -263,7 +260,7 @@ public class CacheService implements CacheServiceMBean
 
     public long getKeyCacheCapacityInBytes()
     {
-        return keyCache.getCapacity();
+        return keyCache.getMetrics().capacityInBytes.value();
     }
 
     public long getKeyCacheCapacityInMB()
@@ -282,12 +279,12 @@ public class CacheService implements CacheServiceMBean
 
     public long getRowCacheSize()
     {
-        return rowCache.weightedSize();
+        return rowCache.getMetrics().size.value();
     }
 
     public long getKeyCacheSize()
     {
-        return keyCache.weightedSize();
+        return keyCache.getMetrics().size.value();
     }
 
     public void reduceCacheSizes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/CacheServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheServiceMBean.java b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
index 747b8f8..a34d5ed 100644
--- a/src/java/org/apache/cassandra/service/CacheServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
@@ -21,13 +21,28 @@ import java.util.concurrent.ExecutionException;
 
 public interface CacheServiceMBean
 {
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#hits
+     */
+    @Deprecated
     public long getKeyCacheHits();
+    @Deprecated
     public long getRowCacheHits();
 
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#requests
+     */
+    @Deprecated
     public long getKeyCacheRequests();
+    @Deprecated
     public long getRowCacheRequests();
 
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#recentHitRate
+     */
+    @Deprecated
     public double getKeyCacheRecentHitRate();
+    @Deprecated
     public double getRowCacheRecentHitRate();
 
     public int getRowCacheSavePeriodInSeconds();
@@ -47,15 +62,31 @@ public interface CacheServiceMBean
     public void invalidateRowCache();
 
     public long getRowCacheCapacityInMB();
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#capacityInBytes
+     */
+    @Deprecated
     public long getRowCacheCapacityInBytes();
     public void setRowCacheCapacityInMB(long capacity);
 
     public long getKeyCacheCapacityInMB();
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#capacityInBytes
+     */
+    @Deprecated
     public long getKeyCacheCapacityInBytes();
     public void setKeyCacheCapacityInMB(long capacity);
 
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#size
+     */
+    @Deprecated
     public long getRowCacheSize();
 
+    /**
+     * @see org.apache.cassandra.metrics.CacheMetrics#size
+     */
+    @Deprecated
     public long getKeyCacheSize();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5b85a84..3863eed 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -58,18 +58,12 @@ import org.apache.cassandra.net.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.*;
 
-
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
     private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
 
-    // mbean stuff
-    private static final LatencyTracker readStats = new LatencyTracker();
-    private static final LatencyTracker rangeStats = new LatencyTracker();
-    private static final LatencyTracker writeStats = new LatencyTracker();
-
     public static final String UNREACHABLE = "UNREACHABLE";
 
     private static final WritePerformer standardWritePerformer;
@@ -88,6 +82,9 @@ public class StorageProxy implements StorageProxyMBean
         }
     });
     private static final AtomicLong totalHints = new AtomicLong();
+    private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
+    private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
+    private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
 
     private StorageProxy() {}
 
@@ -200,6 +197,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (TimedOutException ex)
         {
+            writeMetrics.timeouts.mark();
             ClientRequestMetrics.writeTimeouts.inc();
             if (logger.isDebugEnabled())
             {
@@ -212,6 +210,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (UnavailableException e)
         {
+            writeMetrics.unavailables.mark();
             ClientRequestMetrics.writeUnavailables.inc();
             throw e;
         }
@@ -222,7 +221,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            writeStats.addNano(System.nanoTime() - startTime);
+            writeMetrics.addNano(System.nanoTime() - startTime);
         }
     }
 
@@ -602,6 +601,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands))
         {
+            readMetrics.unavailables.mark();
             ClientRequestMetrics.readUnavailables.inc();
             throw new UnavailableException();
         }
@@ -613,17 +613,19 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (UnavailableException e)
         {
+            readMetrics.unavailables.mark();
             ClientRequestMetrics.readUnavailables.inc();
             throw e;
         }
         catch (TimeoutException e)
         {
+            readMetrics.timeouts.mark();
             ClientRequestMetrics.readTimeouts.inc();
             throw e;
         }
         finally
         {
-            readStats.addNano(System.nanoTime() - startTime);
+            readMetrics.addNano(System.nanoTime() - startTime);
         }
         return rows;
     }
@@ -940,7 +942,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            rangeStats.addNano(System.nanoTime() - startTime);
+            rangeMetrics.addNano(System.nanoTime() - startTime);
         }
         return trim(command, rows);
     }
@@ -1084,77 +1086,77 @@ public class StorageProxy implements StorageProxyMBean
 
     public long getReadOperations()
     {
-        return readStats.getOpCount();
+        return readMetrics.latency.count();
     }
 
     public long getTotalReadLatencyMicros()
     {
-        return readStats.getTotalLatencyMicros();
+        return readMetrics.totalLatency.count();
     }
 
     public double getRecentReadLatencyMicros()
     {
-        return readStats.getRecentLatencyMicros();
+        return readMetrics.getRecentLatency();
     }
 
     public long[] getTotalReadLatencyHistogramMicros()
     {
-        return readStats.getTotalLatencyHistogramMicros();
+        return readMetrics.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentReadLatencyHistogramMicros()
     {
-        return readStats.getRecentLatencyHistogramMicros();
+        return readMetrics.recentLatencyHistogram.getBuckets(true);
     }
 
     public long getRangeOperations()
     {
-        return rangeStats.getOpCount();
+        return rangeMetrics.latency.count();
     }
 
     public long getTotalRangeLatencyMicros()
     {
-        return rangeStats.getTotalLatencyMicros();
+        return rangeMetrics.totalLatency.count();
     }
 
     public double getRecentRangeLatencyMicros()
     {
-        return rangeStats.getRecentLatencyMicros();
+        return rangeMetrics.getRecentLatency();
     }
 
     public long[] getTotalRangeLatencyHistogramMicros()
     {
-        return rangeStats.getTotalLatencyHistogramMicros();
+        return rangeMetrics.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentRangeLatencyHistogramMicros()
     {
-        return rangeStats.getRecentLatencyHistogramMicros();
+        return rangeMetrics.recentLatencyHistogram.getBuckets(true);
     }
 
     public long getWriteOperations()
     {
-        return writeStats.getOpCount();
+        return writeMetrics.latency.count();
     }
 
     public long getTotalWriteLatencyMicros()
     {
-        return writeStats.getTotalLatencyMicros();
+        return writeMetrics.totalLatency.count();
     }
 
     public double getRecentWriteLatencyMicros()
     {
-        return writeStats.getRecentLatencyMicros();
+        return writeMetrics.getRecentLatency();
     }
 
     public long[] getTotalWriteLatencyHistogramMicros()
     {
-        return writeStats.getTotalLatencyHistogramMicros();
+        return writeMetrics.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentWriteLatencyHistogramMicros()
     {
-        return writeStats.getRecentLatencyHistogramMicros();
+        return writeMetrics.recentLatencyHistogram.getBuckets(true);
     }
 
     public boolean getHintedHandoffEnabled()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index dd1541a..c6d4b7c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,22 +19,52 @@ package org.apache.cassandra.service;
 
 public interface StorageProxyMBean
 {
+    /**
+     * @see org.apache.cassandra.metrics.LatencyMetrics#opCount
+     */
+    @Deprecated
     public long getReadOperations();
+    /**
+     * @see org.apache.cassandra.metrics.LatencyMetrics#totalLatency
+     */
+    @Deprecated
     public long getTotalReadLatencyMicros();
+    /**
+     * @see org.apache.cassandra.metrics.LatencyMetrics#recentLatencyMicro
+     */
+    @Deprecated
     public double getRecentReadLatencyMicros();
+    /**
+     * @see org.apache.cassandra.metrics.LatencyMetrics#totalLatencyHistogramMicro
+     */
+    @Deprecated
     public long[] getTotalReadLatencyHistogramMicros();
+    /**
+     * @see org.apache.cassandra.metrics.LatencyMetrics#recentLatencyHistogramMicro
+     */
+    @Deprecated
     public long[] getRecentReadLatencyHistogramMicros();
 
+    @Deprecated
     public long getRangeOperations();
+    @Deprecated
     public long getTotalRangeLatencyMicros();
+    @Deprecated
     public double getRecentRangeLatencyMicros();
+    @Deprecated
     public long[] getTotalRangeLatencyHistogramMicros();
+    @Deprecated
     public long[] getRecentRangeLatencyHistogramMicros();
 
+    @Deprecated
     public long getWriteOperations();
+    @Deprecated
     public long getTotalWriteLatencyMicros();
+    @Deprecated
     public double getRecentWriteLatencyMicros();
+    @Deprecated
     public long[] getTotalWriteLatencyHistogramMicros();
+    @Deprecated
     public long[] getRecentWriteLatencyHistogramMicros();
 
     public long getTotalHints();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f0714a..9d7b481 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -110,6 +111,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
     public static final StorageService instance = new StorageService();
 
+    private static final StorageMetrics metrics = new StorageMetrics();
+
     public static IPartitioner getPartitioner()
     {
         return DatabaseDescriptor.getPartitioner();
@@ -395,13 +398,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             throw new AssertionError(e);
         }
 
-        if (!isClientMode)
-        {
-            // "Touch" metrics classes to trigger static initialization, such that all metrics become available
-            // on start-up even if they have not yet been used.
-            new ClientRequestMetrics();
-        }
-
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
             logger.info("Loading persisted ring state");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 350eff5..8ad047e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -171,7 +171,9 @@ public interface StorageServiceMBean
 
     /**
      * Numeric load value.
+     * @see org.apache.cassandra.metrics.StorageMetrics#load
      */
+    @Deprecated
     public double getLoad();
 
     /** Human-readable load value */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index ba7d9f3..5ecdfbf 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -29,6 +29,7 @@ import com.ning.compress.lzf.LZFOutputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -57,6 +58,7 @@ public class FileStreamTask extends WrappedRunnable
     // outbound global throughput limiter
     protected final Throttle throttle;
     private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
+    protected final StreamingMetrics metrics;
 
     public FileStreamTask(StreamHeader header, InetAddress to)
     {
@@ -73,9 +75,10 @@ public class FileStreamTask extends WrappedRunnable
                 // total throughput
                 int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
                 // per stream throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, MessagingService.instance().getActiveStreamsOutbound());
+                return totalBytesPerMS / Math.max(1, (int)StreamingMetrics.activeStreamsOutbound.count());
             }
         });
+        metrics = StreamingMetrics.get(to);
     }
 
     public void runMayThrow() throws IOException
@@ -141,9 +144,10 @@ public class FileStreamTask extends WrappedRunnable
         // setting up data compression stream
         compressedoutput = new LZFOutputStream(output);
 
-        MessagingService.instance().incrementActiveStreamsOutbound();
+        StreamingMetrics.activeStreamsOutbound.inc();
         try
         {
+            long totalBytesTransferred = 0;
             // stream each of the required sections of the file
             for (Pair<Long, Long> section : header.file.sections)
             {
@@ -159,6 +163,7 @@ public class FileStreamTask extends WrappedRunnable
                 {
                     long lastWrite = write(file, length, bytesTransferred);
                     bytesTransferred += lastWrite;
+                    totalBytesTransferred += lastWrite;
                     // store streaming progress
                     header.file.progress += lastWrite;
                 }
@@ -169,12 +174,14 @@ public class FileStreamTask extends WrappedRunnable
                 if (logger.isDebugEnabled())
                     logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
             }
+            StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
+            metrics.outgoingBytes.inc(totalBytesTransferred);
             // receive reply confirmation
             receiveReply();
         }
         finally
         {
-            MessagingService.instance().decrementActiveStreamsOutbound();
+            StreamingMetrics.activeStreamsOutbound.dec();
 
             // no matter what happens close file
             FileUtils.closeQuietly(file);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index f74f566..49cb35b 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,6 +53,7 @@ public class IncomingStreamReader
     protected final PendingFile remoteFile;
     protected final StreamInSession session;
     private final InputStream underliningStream;
+    private final StreamingMetrics metrics;
 
     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
@@ -80,6 +82,7 @@ public class IncomingStreamReader
         {
             underliningStream = null;
         }
+        metrics = StreamingMetrics.get(socket.getInetAddress());
     }
 
     /**
@@ -127,6 +130,7 @@ public class IncomingStreamReader
         try
         {
             BytesReadTracker in = new BytesReadTracker(input);
+            long totalBytesRead = 0;
 
             for (Pair<Long, Long> section : localFile.sections)
             {
@@ -166,8 +170,11 @@ public class IncomingStreamReader
                     remoteFile.progress += remoteFile.compressionInfo != null
                                            ? ((CompressedInputStream) underliningStream).uncompressedBytes()
                                            : in.getBytesRead();
+                    totalBytesRead += in.getBytesRead();
                 }
             }
+            StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
+            metrics.incomingBytes.inc(totalBytesRead);
             return writer.closeAndOpenReader();
         }
         catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
index 1a281fc..398599f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
@@ -67,11 +68,12 @@ public class CompressedFileStreamTask extends FileStreamTask
         RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true);
         FileChannel fc = file.getChannel();
 
-        MessagingService.instance().incrementActiveStreamsOutbound();
+        StreamingMetrics.activeStreamsOutbound.inc();
         // calculate chunks to transfer. we want to send continuous chunks altogether.
         List<Pair<Long, Long>> sections = getTransferSections(header.file.compressionInfo.chunks);
         try
         {
+            long totalBytesTransferred = 0;
             // stream each of the required sections of the file
             for (Pair<Long, Long> section : sections)
             {
@@ -99,18 +101,21 @@ public class CompressedFileStreamTask extends FileStreamTask
                         throttle.throttleDelta(toTransfer);
                         lastWrite = toTransfer;
                     }
+                    totalBytesTransferred += lastWrite;
                     bytesTransferred += lastWrite;
                     header.file.progress += lastWrite;
                 }
 
                 logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
             }
+            StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
+            metrics.outgoingBytes.inc(totalBytesTransferred);
             // receive reply confirmation
             receiveReply();
         }
         finally
         {
-            MessagingService.instance().decrementActiveStreamsOutbound();
+            StreamingMetrics.activeStreamsOutbound.dec();
 
             // no matter what happens close file
             FileUtils.closeQuietly(file);


Mime
View raw message