Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2143FF9EE for ; Sat, 5 Oct 2013 02:53:29 +0000 (UTC) Received: (qmail 61506 invoked by uid 500); 5 Oct 2013 02:53:25 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 61318 invoked by uid 500); 5 Oct 2013 02:53:20 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 61204 invoked by uid 99); 5 Oct 2013 02:53:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Oct 2013 02:53:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6F91991055A; Sat, 5 Oct 2013 02:53:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Sat, 05 Oct 2013 02:53:18 -0000 Message-Id: In-Reply-To: <67ee1d77564c4e7d91625a2f8db95446@git.apache.org> References: <67ee1d77564c4e7d91625a2f8db95446@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] git commit: Expose connected thrift + native client counts patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-5084 Expose connected thrift + native client counts patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-5084 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b7dd5e6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b7dd5e6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b7dd5e6 Branch: refs/heads/trunk Commit: 5b7dd5e62897d0ac13f96330e360d2262aeb0650 Parents: cf38e9e Author: Jonathan Ellis Authored: Fri Oct 4 21:51:39 2013 -0500 Committer: Jonathan Ellis Committed: Fri Oct 4 21:51:39 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/metrics/ClientMetrics.java | 35 ++++++++++++++++++ .../cassandra/thrift/CassandraServer.java | 19 +++++++++- .../cassandra/thrift/ThriftSessionManager.java | 5 +++ .../org/apache/cassandra/transport/Server.java | 37 ++++++++++++++------ 5 files changed, 86 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3dc5e77..5af4e2e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ * Fix fat client schema pull NPE (CASSANDRA-6089) * Fix memtable flushing for indexed tables (CASSANDRA-6112) * Fix skipping columns with multiple slices (CASSANDRA-6119) + * Expose connected thrift + native client counts (CASSANDRA-5084) 1.2.10 http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/metrics/ClientMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java new file mode 100644 index 0000000..cb10ad5 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java @@ -0,0 +1,35 @@ +package org.apache.cassandra.metrics; + +import java.util.concurrent.Callable; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Gauge; + +public class ClientMetrics +{ + private static final MetricNameFactory factory = new DefaultNameFactory("Client"); + + public static final ClientMetrics instance = new ClientMetrics(); + + private ClientMetrics() + { + } + + public void addCounter(String name, final Callable provider) + { + Metrics.newGauge(factory.createMetricName(name), new Gauge() + { + public Integer value() + { + try + { + return provider.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 1959815..65ae177 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -53,8 +54,12 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.scheduler.IRequestScheduler; -import org.apache.cassandra.service.*; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -80,6 +85,7 @@ public class CassandraServer implements Cassandra.Iface public CassandraServer() { requestScheduler = DatabaseDescriptor.getRequestScheduler(); + registerMetrics(); } public ThriftClientState state() @@ -1877,5 +1883,16 @@ public class CassandraServer implements Cassandra.Iface return false; } + private void registerMetrics() + { + ClientMetrics.instance.addCounter("connectedThriftClients", new Callable() + { + @Override + public Integer call() throws Exception + { + return ThriftSessionManager.instance.getConnectedClients(); + } + }); + } // main method moved to CassandraDaemon } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java index bbc4bff..9a537e8 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java +++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java @@ -73,4 +73,9 @@ public class ThriftSessionManager if (logger.isTraceEnabled()) logger.trace("ClientState removed for socket addr {}", socket); } + + public int getConnectedClients() + { + return activeSocketSessions.size(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 7400a8b..0ffb92b 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.EnumMap; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; @@ -32,19 +33,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.IEndpointLifecycleSubscriber; -import org.apache.cassandra.service.IMigrationListener; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.*; import org.apache.cassandra.transport.messages.EventMessage; import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; @@ -76,6 +70,7 @@ public class Server implements CassandraDaemon.Server EventNotifier notifier = new EventNotifier(this); StorageService.instance.register(notifier); MigrationManager.instance.register(notifier); + registerMetrics(); } public Server(String hostname, int port) @@ -140,6 +135,18 @@ public class Server implements CassandraDaemon.Server connectionTracker.allChannels.add(channel); } + private void registerMetrics() + { + ClientMetrics.instance.addCounter("connectedNativeClients", new Callable() + { + @Override + public Integer call() throws Exception + { + return connectionTracker.getConnectedClients(); + } + }); + } + private void close() { // Close opened connections @@ -187,6 +194,16 @@ public class Server implements CassandraDaemon.Server { allChannels.close().awaitUninterruptibly(); } + + public int getConnectedClients() + { + /* + - When server is running: allChannels contains all clients' connections (channels) + plus one additional channel used for the server's own bootstrap. + - When server is stopped: the size is 0 + */ + return allChannels.size() != 0 ? allChannels.size() - 1 : 0; + } } private static class PipelineFactory implements ChannelPipelineFactory