Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 35ADD200BD8 for ; Tue, 22 Nov 2016 22:33:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 34804160B1C; Tue, 22 Nov 2016 21:33:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B287160B0C for ; Tue, 22 Nov 2016 22:33:25 +0100 (CET) Received: (qmail 81894 invoked by uid 500); 22 Nov 2016 21:33:19 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 81212 invoked by uid 99); 22 Nov 2016 21:33:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2016 21:33:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA58BF1707; Tue, 22 Nov 2016 21:33:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Tue, 22 Nov 2016 21:33:40 -0000 Message-Id: <0553cdacbb3649da8ecd6dec94cf11a9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] hadoop git commit: HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula. archived-at: Tue, 22 Nov 2016 21:33:26 -0000 HADOOP-13742. Expose NumOpenConnectionsPerUser as a metric. Brahma Reddy Battula. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bd373555 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bd373555 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bd373555 Branch: refs/heads/HADOOP-13070 Commit: bd3735554fa5c3bc064c57ec78f4308430b14b48 Parents: b2d4b7b Author: Kihwal Lee Authored: Thu Nov 17 12:16:38 2016 -0600 Committer: Kihwal Lee Committed: Thu Nov 17 12:16:38 2016 -0600 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/ipc/Server.java | 61 +++++++++++++++++++- .../apache/hadoop/ipc/metrics/RpcMetrics.java | 5 ++ .../java/org/apache/hadoop/ipc/TestRPC.java | 29 +++++++++- .../org/apache/hadoop/test/MetricsAsserts.java | 7 +++ 4 files changed, 98 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 1c7e76a..8f1956e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -122,6 +122,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; +import org.codehaus.jackson.map.ObjectMapper; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -2151,6 +2152,9 @@ public abstract class Server { authorizeConnection(); // don't set until after authz because connection isn't established connectionContextRead = true; + if (user != null) { + connectionManager.incrUserConnections(user.getShortUserName()); + } } /** @@ -3019,7 +3023,20 @@ public abstract class Server { public int getNumOpenConnections() { return connectionManager.size(); } - + + /** + * Get the NumOpenConnections/User. + */ + public String getNumOpenConnectionsPerUser() { + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper + .writeValueAsString(connectionManager.getUserToConnectionsMap()); + } catch (IOException ignored) { + } + return null; + } + /** * The number of rpc calls in the queue. * @return The number of rpc calls in the queue. @@ -3139,6 +3156,9 @@ public abstract class Server { private class ConnectionManager { final private AtomicInteger count = new AtomicInteger(); final private Set connections; + /* Map to maintain the statistics per User */ + final private Map userToConnectionsMap; + final private Object userToConnectionsMapLock = new Object(); final private Timer idleScanTimer; final private int idleScanThreshold; @@ -3170,6 +3190,7 @@ public abstract class Server { this.connections = Collections.newSetFromMap( new ConcurrentHashMap( maxQueueSize, 0.75f, readThreads+2)); + this.userToConnectionsMap = new ConcurrentHashMap<>(); } private boolean add(Connection connection) { @@ -3187,7 +3208,39 @@ public abstract class Server { } return removed; } - + + void incrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + count = 1; + } else { + count++; + } + userToConnectionsMap.put(user, count); + } + } + + void decrUserConnections(String user) { + synchronized (userToConnectionsMapLock) { + Integer count = userToConnectionsMap.get(user); + if (count == null) { + return; + } else { + count--; + } + if (count == 0) { + userToConnectionsMap.remove(user); + } else { + userToConnectionsMap.put(user, count); + } + } + } + + Map getUserToConnectionsMap() { + return userToConnectionsMap; + } + int size() { return count.get(); } @@ -3226,6 +3279,10 @@ public abstract class Server { // only close if actually removed to avoid double-closing due // to possible races connection.close(); + // Remove authorized users only + if (connection.user != null && connection.connectionContextRead) { + decrUserConnections(connection.user.getShortUserName()); + } } return exists; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 5373f95..ef43618 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -104,6 +104,11 @@ public class RpcMetrics { return server.getNumOpenConnections(); } + @Metric("Number of open connections per user") + public String numOpenConnectionsPerUser() { + return server.getNumOpenConnectionsPerUser(); + } + @Metric("Length of the call queue") public int callQueueLength() { return server.getCallQueueLen(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 72b603a..f0d883b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -64,6 +64,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -1015,7 +1016,7 @@ public class TestRPC extends TestRpcBase { @Test public void testRpcMetrics() throws Exception { - Server server; + final Server server; TestRpcService proxy = null; final int interval = 1; @@ -1025,7 +1026,21 @@ public class TestRPC extends TestRpcBase { RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); server = setupTestServer(conf, 5); - + String testUser = "testUser"; + UserGroupInformation anotherUser = + UserGroupInformation.createRemoteUser(testUser); + TestRpcService proxy2 = + anotherUser.doAs(new PrivilegedAction() { + public TestRpcService run() { + try { + return RPC.getProxy(TestRpcService.class, 0, + server.getListenerAddress(), conf); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }); try { proxy = getClient(addr, conf); @@ -1033,6 +1048,7 @@ public class TestRPC extends TestRpcBase { proxy.ping(null, newEmptyRequest()); proxy.echo(null, newEchoRequest("" + i)); + proxy2.echo(null, newEchoRequest("" + i)); } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); @@ -1044,7 +1060,16 @@ public class TestRPC extends TestRpcBase { rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", rpcMetrics); + String actualUserVsCon = MetricsAsserts + .getStringMetric("NumOpenConnectionsPerUser", rpcMetrics); + String proxyUser = + UserGroupInformation.getCurrentUser().getShortUserName(); + assertTrue(actualUserVsCon.contains("\"" + proxyUser + "\":1")); + assertTrue(actualUserVsCon.contains("\"" + testUser + "\":1")); } finally { + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } stop(server, proxy); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd373555/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 982481e..5d87b07 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -236,6 +236,13 @@ public class MetricsAsserts { return captor.getValue(); } + public static String getStringMetric(String name, MetricsRecordBuilder rb) { + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(rb, atLeast(0)).tag(eqName(info(name, "")), captor.capture()); + checkCaptured(captor, name); + return captor.getValue(); + } + /** * Assert a float gauge metric as expected * @param name of the metric --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org