Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0ACC0179F5 for ; Thu, 8 Oct 2015 00:07:03 +0000 (UTC) Received: (qmail 2230 invoked by uid 500); 8 Oct 2015 00:07:03 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 2184 invoked by uid 500); 8 Oct 2015 00:07:02 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 2175 invoked by uid 99); 8 Oct 2015 00:07:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Oct 2015 00:07:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B65BBE08CD; Thu, 8 Oct 2015 00:07:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ndimiduk@apache.org To: commits@hbase.apache.org Message-Id: <32807bd3c9194baca5aa20b9c94a2c41@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-12911 Client-side metrics Date: Thu, 8 Oct 2015 00:07:02 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1 c93639359 -> a048c32a9 HBASE-12911 Client-side metrics Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a048c32a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a048c32a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a048c32a Branch: refs/heads/branch-1 Commit: a048c32a996d6110aae116d9c231ec53c81dbf8d Parents: c936393 Author: Nick Dimiduk Authored: Mon Oct 5 10:19:40 2015 -0700 Committer: Nick Dimiduk Committed: Wed Oct 7 17:05:51 2015 -0700 ---------------------------------------------------------------------- hbase-client/pom.xml | 4 + .../hadoop/hbase/client/ClusterConnection.java | 5 + .../apache/hadoop/hbase/client/Connection.java | 1 - .../hadoop/hbase/client/ConnectionAdapter.java | 5 + .../hadoop/hbase/client/ConnectionManager.java | 21 +- .../apache/hadoop/hbase/client/MetaCache.java | 9 + .../hadoop/hbase/client/MetricsConnection.java | 324 +++++++++++++++++++ .../hadoop/hbase/ipc/AbstractRpcClient.java | 25 +- .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 6 +- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 9 +- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 96 ++++-- .../hbase/ipc/AsyncServerResponseHandler.java | 5 +- .../java/org/apache/hadoop/hbase/ipc/Call.java | 11 +- .../hadoop/hbase/ipc/RpcClientFactory.java | 22 +- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 50 ++- .../hbase/client/TestMetricsConnection.java | 120 +++++++ .../hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/MetricsRegionServer.java | 3 +- .../hadoop/hbase/client/TestClientTimeouts.java | 7 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 18 +- .../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 14 +- .../hbase/ipc/TestGlobalEventLoopGroup.java | 6 +- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 4 +- .../hadoop/hbase/ipc/TestRpcClientLeaks.java | 6 +- .../hbase/ipc/TestRpcHandlerException.java | 3 +- 25 files changed, 669 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index bb6a02f..ef9f30b 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -189,6 +189,10 @@ log4j test + + com.yammer.metrics + metrics-core + http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index f3df010..c813bae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -301,4 +301,9 @@ public interface ClusterConnection extends HConnection { */ ClientBackoffPolicy getBackoffPolicy(); + /** + * @return the MetricsConnection instance associated with this connection. + */ + public MetricsConnection getConnectionMetrics(); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index cb4b0d8..bce0f91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable { * @return true if this connection is closed */ boolean isClosed(); - } http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 99da1be..b42593e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -131,6 +131,11 @@ abstract class ConnectionAdapter implements ClusterConnection { } @Override + public MetricsConnection getConnectionMetrics() { + return wrappedConnection.getConnectionMetrics(); + } + + @Override public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { return wrappedConnection.isMasterRunning(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 491c547..0e4fb3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -600,14 +602,15 @@ class ConnectionManager { // Client rpc instance. private RpcClient rpcClient; - private MetaCache metaCache = new MetaCache(); + private final MetaCache metaCache; + private final MetricsConnection metrics; private int refCount; // indicates whether this connection's life cycle is managed (by us) private boolean managed; - private User user; + protected User user; private RpcRetryingCallerFactory rpcCallerFactory; @@ -670,6 +673,12 @@ class ConnectionManager { this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.asyncProcess = createAsyncProcess(this.conf); + if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { + this.metrics = new MetricsConnection(this); + } else { + this.metrics = null; + } + this.metaCache = new MetaCache(this.metrics); this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, @@ -777,6 +786,11 @@ class ConnectionManager { return new HBaseAdmin(this); } + @Override + public MetricsConnection getConnectionMetrics() { + return this.metrics; + } + private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { @@ -2420,6 +2434,9 @@ class ConnectionManager { } closeMaster(); shutdownPools(); + if (this.metrics != null) { + this.metrics.shutdown(); + } this.closed = true; closeZooKeeperWatcher(); this.stubs.clear(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 8daaac9..a9ce173 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -60,6 +60,12 @@ public class MetaCache { // The access to this attribute must be protected by a lock on cachedRegionLocations private final Set cachedServers = new ConcurrentSkipListSet(); + private final MetricsConnection metrics; + + public MetaCache(MetricsConnection metrics) { + this.metrics = metrics; + } + /** * Search the cache for a location that fits our table and row key. * Return null if no suitable region is located. @@ -75,6 +81,7 @@ public class MetaCache { Entry e = tableLocations.floorEntry(row); if (e == null) { + if (metrics!= null) metrics.incrMetaCacheMiss(); return null; } RegionLocations possibleRegion = e.getValue(); @@ -88,10 +95,12 @@ public class MetaCache { if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { + if (metrics != null) metrics.incrMetaCacheHit(); return possibleRegion; } // Passed all the way through, so we got nothing - complete cache miss + if (metrics != null) metrics.incrMetaCacheMiss(); return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java new file mode 100644 index 0000000..799edda --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.reporting.JmxReporter; +import com.yammer.metrics.util.RatioGauge; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * This class is for maintaining the various connection statistics and publishing them through + * the metrics interfaces. + * + * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not + * conflict with other uses of Yammer Metrics within the client application. Instantiating + * this class implicitly creates and "starts" instances of these classes; be sure to call + * {@link #shutdown()} to terminate the thread pools they allocate. + */ +@InterfaceAudience.Private +public class MetricsConnection { + + /** Set this key to {@code true} to enable metrics collection of client requests. */ + public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; + + private static final String DRTN_BASE = "rpcCallDurationMs_"; + private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; + private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; + private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); + + /** A container class for collecting details about the RPC call as it percolates. */ + public static class CallStats { + private long requestSizeBytes = 0; + private long responseSizeBytes = 0; + private long startTime = 0; + private long callTimeMs = 0; + + public long getRequestSizeBytes() { + return requestSizeBytes; + } + + public void setRequestSizeBytes(long requestSizeBytes) { + this.requestSizeBytes = requestSizeBytes; + } + + public long getResponseSizeBytes() { + return responseSizeBytes; + } + + public void setResponseSizeBytes(long responseSizeBytes) { + this.responseSizeBytes = responseSizeBytes; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getCallTimeMs() { + return callTimeMs; + } + + public void setCallTimeMs(long callTimeMs) { + this.callTimeMs = callTimeMs; + } + } + + @VisibleForTesting + protected final class CallTracker { + private final String name; + @VisibleForTesting final Timer callTimer; + @VisibleForTesting final Histogram reqHist; + @VisibleForTesting final Histogram respHist; + + private CallTracker(MetricsRegistry registry, String name, String subName, String scope) { + StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); + if (subName != null) { + sb.append("(").append(subName).append(")"); + } + this.name = sb.toString(); + this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope); + this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope); + this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope); + } + + private CallTracker(MetricsRegistry registry, String name, String scope) { + this(registry, name, null, scope); + } + + public void updateRpc(CallStats stats) { + this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); + this.reqHist.update(stats.getRequestSizeBytes()); + this.respHist.update(stats.getResponseSizeBytes()); + } + + @Override + public String toString() { + return "CallTracker:" + name; + } + } + + /** A lambda for dispatching to the appropriate metric factory method */ + private static interface NewMetric { + T newMetric(Class clazz, String name, String scope); + } + + /** Anticipated number of metric entries */ + private static final int CAPACITY = 50; + /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ + private static final float LOAD_FACTOR = 0.75f; + /** + * Anticipated number of concurrent accessor threads, from + * {@link ConnectionManager.HConnectionImplementation#getBatchPool()} + */ + private static final int CONCURRENCY_LEVEL = 256; + + private final MetricsRegistry registry; + private final JmxReporter reporter; + private final String scope; + + private final NewMetric timerFactory = new NewMetric() { + @Override public Timer newMetric(Class clazz, String name, String scope) { + return registry.newTimer(clazz, name, scope); + } + }; + + private final NewMetric histogramFactory = new NewMetric() { + @Override public Histogram newMetric(Class clazz, String name, String scope) { + return registry.newHistogram(clazz, name, scope); + } + }; + + // static metrics + + @VisibleForTesting protected final Counter metaCacheHits; + @VisibleForTesting protected final Counter metaCacheMisses; + @VisibleForTesting protected final CallTracker getTracker; + @VisibleForTesting protected final CallTracker scanTracker; + @VisibleForTesting protected final CallTracker appendTracker; + @VisibleForTesting protected final CallTracker deleteTracker; + @VisibleForTesting protected final CallTracker incrementTracker; + @VisibleForTesting protected final CallTracker putTracker; + @VisibleForTesting protected final CallTracker multiTracker; + + // dynamic metrics + + // These maps are used to cache references to the metric instances that are managed by the + // registry. I don't think their use perfectly removes redundant allocations, but it's + // a big improvement over calling registry.newMetric each time. + @VisibleForTesting protected final ConcurrentMap rpcTimers = + new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); + @VisibleForTesting protected final ConcurrentMap rpcHistograms = + new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */, + LOAD_FACTOR, CONCURRENCY_LEVEL); + + public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) { + this.scope = conn.toString(); + this.registry = new MetricsRegistry(); + final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool(); + final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool(); + + this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope, + new RatioGauge() { + @Override protected double getNumerator() { + return batchPool.getActiveCount(); + } + @Override protected double getDenominator() { + return batchPool.getMaximumPoolSize(); + } + }); + this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope, + new RatioGauge() { + @Override protected double getNumerator() { + return metaPool.getActiveCount(); + } + @Override protected double getDenominator() { + return metaPool.getMaximumPoolSize(); + } + }); + this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope); + this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope); + this.getTracker = new CallTracker(this.registry, "Get", scope); + this.scanTracker = new CallTracker(this.registry, "Scan", scope); + this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); + this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); + this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); + this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); + this.multiTracker = new CallTracker(this.registry, "Multi", scope); + this.reporter = new JmxReporter(this.registry); + this.reporter.start(); + } + + public void shutdown() { + this.reporter.shutdown(); + this.registry.shutdown(); + } + + /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ + public static CallStats newCallStats() { + // TODO: instance pool to reduce GC? + return new CallStats(); + } + + /** Increment the number of meta cache hits. */ + public void incrMetaCacheHit() { + metaCacheHits.inc(); + } + + /** Increment the number of meta cache misses. */ + public void incrMetaCacheMiss() { + metaCacheMisses.inc(); + } + + /** + * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. + */ + private T getMetric(String key, ConcurrentMap map, NewMetric factory) { + T t = map.get(key); + if (t == null) { + t = factory.newMetric(this.getClass(), key, scope); + map.putIfAbsent(key, t); + } + return t; + } + + /** Update call stats for non-critical-path methods */ + private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { + final String methodName = method.getService().getName() + "_" + method.getName(); + getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) + .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); + getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) + .update(stats.getRequestSizeBytes()); + getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) + .update(stats.getResponseSizeBytes()); + } + + /** Report RPC context to metrics system. */ + public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { + // this implementation is tied directly to protobuf implementation details. would be better + // if we could dispatch based on something static, ie, request Message type. + if (method.getService() == ClientService.getDescriptor()) { + switch(method.getIndex()) { + case 0: + assert "Get".equals(method.getName()); + getTracker.updateRpc(stats); + return; + case 1: + assert "Mutate".equals(method.getName()); + final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); + switch(mutationType) { + case APPEND: + appendTracker.updateRpc(stats); + return; + case DELETE: + deleteTracker.updateRpc(stats); + return; + case INCREMENT: + incrementTracker.updateRpc(stats); + return; + case PUT: + putTracker.updateRpc(stats); + return; + default: + throw new RuntimeException("Unrecognized mutation type " + mutationType); + } + case 2: + assert "Scan".equals(method.getName()); + scanTracker.updateRpc(stats); + return; + case 3: + assert "BulkLoadHFile".equals(method.getName()); + // use generic implementation + break; + case 4: + assert "ExecService".equals(method.getName()); + // use generic implementation + break; + case 5: + assert "ExecRegionServerService".equals(method.getName()); + // use generic implementation + break; + case 6: + assert "Multi".equals(method.getName()); + multiTracker.updateRpc(stats); + return; + default: + throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); + } + } + // Fallback to dynamic registry lookup for DDL methods. + updateRpcGeneric(method, stats); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 9be370d..6f5e78a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.security.User; @@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient { protected final Configuration conf; protected String clusterId; protected final SocketAddress localAddr; + protected final MetricsConnection metrics; protected UserProvider userProvider; protected final IPCUtil ipcUtil; @@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient { * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics */ - public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { + public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); @@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient { this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); + this.metrics = metrics; // login the server principal (if using secure Hadoop) if (LOG.isDebugEnabled()) { @@ -205,19 +210,20 @@ public abstract class AbstractRpcClient implements RpcClient { pcrc = new PayloadCarryingRpcController(); } - long startTime = 0; - if (LOG.isTraceEnabled()) { - startTime = EnvironmentEdgeManager.currentTime(); - } Pair val; try { - val = call(pcrc, md, param, returnType, ticket, isa); + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + val = call(pcrc, md, param, returnType, ticket, isa, cs); // Shove the results into controller so can be carried across the proxy/pb service void. pcrc.setCellScanner(val.getSecond()); + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, param, cs); + } if (LOG.isTraceEnabled()) { - long callTime = EnvironmentEdgeManager.currentTime() - startTime; - LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms"); + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); } return val.getFirst(); } catch (Throwable e) { @@ -242,7 +248,8 @@ public abstract class AbstractRpcClient implements RpcClient { */ protected abstract Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress isa) throws IOException, InterruptedException; + InetSocketAddress isa, MetricsConnection.CallStats callStats) + throws IOException, InterruptedException; @Override public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket, http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index 431c669..a5da0dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise { final Message responseDefaultType; final long startTime; final long rpcTimeout; + final MetricsConnection.CallStats callStats; /** * Constructor @@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise { * @param responseDefaultType the default response type */ public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message - param, PayloadCarryingRpcController controller, Message responseDefaultType) { + param, PayloadCarryingRpcController controller, Message responseDefaultType, + MetricsConnection.CallStats callStats) { super(eventLoop); this.id = connectId; @@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise { this.startTime = EnvironmentEdgeManager.currentTime(); this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; + this.callStats = callStats; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 43d75f9..44e8322 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; @@ -310,10 +311,10 @@ public class AsyncRpcChannel { */ public Promise callMethod(final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, final Message request, - final Message responsePrototype) { + final Message responsePrototype, MetricsConnection.CallStats callStats) { final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, - controller, responsePrototype); + controller, responsePrototype, callStats); controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { @@ -433,7 +434,7 @@ public class AsyncRpcChannel { ByteBuf b = channel.alloc().directBuffer(4 + totalSize); try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { - IPCUtil.write(out, rh, call.param, cellBlock); + call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); } channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); @@ -579,8 +580,6 @@ public class AsyncRpcChannel { /** * Clean up calls. - * - * @param cleanAll true if all calls should be cleaned, false for only the timed out calls */ private void cleanupCalls() { List toCleanup = new ArrayList(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index e1662f3..60e9add 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; @@ -146,12 +148,13 @@ public class AsyncRpcClient extends AbstractRpcClient { * @param configuration to HBase * @param clusterId for the cluster * @param localAddress local address to connect to + * @param metrics the connection metrics * @param channelInitializer for custom channel handlers */ - @VisibleForTesting - AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + protected AsyncRpcClient(Configuration configuration, String clusterId, + SocketAddress localAddress, MetricsConnection metrics, ChannelInitializer channelInitializer) { - super(configuration, clusterId, localAddress); + super(configuration, clusterId, localAddress, metrics); if (LOG.isDebugEnabled()) { LOG.debug("Starting async Hbase RPC client"); @@ -191,15 +194,28 @@ public class AsyncRpcClient extends AbstractRpcClient { } } + /** Used in test only. */ + AsyncRpcClient(Configuration configuration) { + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + } + + /** Used in test only. */ + AsyncRpcClient(Configuration configuration, + ChannelInitializer channelInitializer) { + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer); + } + /** * Constructor * * @param configuration to HBase * @param clusterId for the cluster * @param localAddress local address to connect to + * @param metrics the connection metrics */ - public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) { - this(configuration, clusterId, localAddress, null); + public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics) { + this(configuration, clusterId, localAddress, metrics, null); } /** @@ -219,13 +235,14 @@ public class AsyncRpcClient extends AbstractRpcClient { @Override protected Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress addr) throws IOException, InterruptedException { + InetSocketAddress addr, MetricsConnection.CallStats callStats) + throws IOException, InterruptedException { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - Promise promise = connection.callMethod(md, pcrc, param, returnType); + Promise promise = connection.callMethod(md, pcrc, param, returnType, callStats); long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); @@ -244,40 +261,49 @@ public class AsyncRpcClient extends AbstractRpcClient { /** * Call method async */ - private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc, - Message param, Message returnType, User ticket, InetSocketAddress addr, - final RpcCallback done) { + private void callMethod(final Descriptors.MethodDescriptor md, + final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket, + InetSocketAddress addr, final RpcCallback done) { final AsyncRpcChannel connection; try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - - connection.callMethod(md, pcrc, param, returnType).addListener( + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + GenericFutureListener> listener = new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - if(!future.isSuccess()){ - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); - } - }else{ - try { - done.run(future.get()); - }catch (ExecutionException e){ - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - }else{ - pcrc.setFailed(new IOException(cause)); + @Override + public void operationComplete(Future future) throws Exception { + cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime()); + if (metrics != null) { + metrics.updateRpc(md, param, cs); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); + } + if (!future.isSuccess()) { + Throwable cause = future.cause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } else { + try { + done.run(future.get()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); + } else { + pcrc.setFailed(new IOException(cause)); + } + } catch (InterruptedException e) { + pcrc.setFailed(new IOException(e)); + } } - }catch (InterruptedException e){ - pcrc.setFailed(new IOException(e)); } - } - } - }); + }; + cs.setStartTime(EnvironmentEdgeManager.currentTime()); + connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index f7aa8a9..8f6c85b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -24,8 +24,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -39,8 +37,6 @@ import com.google.protobuf.Message; */ @InterfaceAudience.Private public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { - private static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName()); - private final AsyncRpcChannel channel; /** @@ -102,6 +98,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { cellBlockScanner = channel.client.createCellScanner(cellBlock); } call.setSuccess(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); } } catch (IOException e) { // Treat this as a fatal condition and close this connection http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index df32730..5f90837 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -21,6 +21,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -41,16 +42,18 @@ public class Call { Message responseDefaultType; IOException error; // exception, null if value volatile boolean done; // true when call is done - long startTime; final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. + final MetricsConnection.CallStats callStats; protected Call(int id, final Descriptors.MethodDescriptor md, Message param, - final CellScanner cells, final Message responseDefaultType, int timeout) { + final CellScanner cells, final Message responseDefaultType, int timeout, + MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; - this.startTime = EnvironmentEdgeManager.currentTime(); + this.callStats = callStats; + this.callStats.setStartTime(EnvironmentEdgeManager.currentTime()); this.responseDefaultType = responseDefaultType; this.id = id; this.timeout = timeout; @@ -122,6 +125,6 @@ public class Call { } public long getStartTime() { - return this.startTime; + return this.callStats.getStartTime(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 2dbb776..8f45eb7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; import java.net.SocketAddress; @@ -37,15 +39,23 @@ public final class RpcClientFactory { private RpcClientFactory() { } + /** Helper method for tests only. Creates an {@code RpcClient} without metrics. */ + @VisibleForTesting + public static RpcClient createClient(Configuration conf, String clusterId) { + return createClient(conf, clusterId, null); + } + /** * Creates a new RpcClient by the class defined in the configuration or falls back to * RpcClientImpl * @param conf configuration * @param clusterId the cluster id + * @param metrics the connection metrics * @return newly created RpcClient */ - public static RpcClient createClient(Configuration conf, String clusterId) { - return createClient(conf, clusterId, null); + public static RpcClient createClient(Configuration conf, String clusterId, + MetricsConnection metrics) { + return createClient(conf, clusterId, null, metrics); } /** @@ -54,17 +64,19 @@ public final class RpcClientFactory { * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics * @return newly created RpcClient */ public static RpcClient createClient(Configuration conf, String clusterId, - SocketAddress localAddr) { + SocketAddress localAddr, MetricsConnection metrics) { String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class.getName()); return ReflectionUtils.instantiateWithCustomCtor( rpcClientClass, - new Class[] { Configuration.class, String.class, SocketAddress.class }, - new Object[] { conf, clusterId, localAddr } + new Class[] { Configuration.class, String.class, SocketAddress.class, + MetricsConnection.class }, + new Object[] { conf, clusterId, localAddr, metrics } ); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index cb18952..3fb7061 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.Message.Builder; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -911,7 +913,8 @@ public class RpcClientImpl extends AbstractRpcClient { checkIsOpen(); // Now we're checking that it didn't became idle in between. try { - IPCUtil.write(this.out, header, call.param, cellBlock); + call.callStats.setRequestSizeBytes( + IPCUtil.write(this.out, header, call.param, cellBlock)); } catch (IOException e) { // We set the value inside the synchronized block, this way the next in line // won't even try to write. Otherwise we might miss a call in the calls map? @@ -964,12 +967,20 @@ public class RpcClientImpl extends AbstractRpcClient { int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); int whatIsLeftToRead = totalSize - readSoFar; IOUtils.skipFully(in, whatIsLeftToRead); + if (call != null) { + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + } return; } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); RemoteException re = createRemoteException(exceptionResponse); call.setException(re); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); if (isFatalConnectionException(exceptionResponse)) { markClosed(re); } @@ -988,6 +999,9 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); } call.setResponse(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats.setCallTimeMs( + EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); } } catch (IOException e) { if (expectedCall) call.setException(e); @@ -1075,13 +1089,15 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Construct an IPC cluster client whose values are of the {@link Message} class. + * Used in test only. Construct an IPC cluster client whose values are of the + * {@link Message} class. * @param conf configuration * @param clusterId the cluster id * @param factory socket factory */ + @VisibleForTesting RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) { - this(conf, clusterId, factory, null); + this(conf, clusterId, factory, null, null); } /** @@ -1090,10 +1106,11 @@ public class RpcClientImpl extends AbstractRpcClient { * @param clusterId the cluster id * @param factory socket factory * @param localAddr client socket bind address + * @param metrics the connection metrics */ RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory, - SocketAddress localAddr) { - super(conf, clusterId, localAddr); + SocketAddress localAddr, MetricsConnection metrics) { + super(conf, clusterId, localAddr, metrics); this.socketFactory = factory; this.connections = new PoolMap(getPoolType(conf), getPoolSize(conf)); @@ -1101,25 +1118,27 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory - * @param conf configuration - * @param clusterId the cluster id + * Used in test only. Construct an IPC client for the cluster {@code clusterId} with + * the default SocketFactory */ - public RpcClientImpl(Configuration conf, String clusterId) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null); + @VisibleForTesting + RpcClientImpl(Configuration conf, String clusterId) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null); } /** - * Construct an IPC client for the cluster clusterId with the default SocketFactory + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory * * This method is called with reflection by the RpcClientFactory to create an instance * * @param conf configuration * @param clusterId the cluster id * @param localAddr client socket bind address. + * @param metrics the connection metrics */ - public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) { - this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr); + public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics); } /** Stop all threads related to this client. No further calls may be made @@ -1182,7 +1201,8 @@ public class RpcClientImpl extends AbstractRpcClient { */ @Override protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, - Message param, Message returnType, User ticket, InetSocketAddress addr) + Message param, Message returnType, User ticket, InetSocketAddress addr, + MetricsConnection.CallStats callStats) throws IOException, InterruptedException { if (pcrc == null) { pcrc = new PayloadCarryingRpcController(); @@ -1190,7 +1210,7 @@ public class RpcClientImpl extends AbstractRpcClient { CellScanner cells = pcrc.cellScanner(); final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType, - pcrc.getCallTimeout()); + pcrc.getCallTimeout(), MetricsConnection.newCallStats()); final Connection connection = getConnection(ticket, call, addr); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java new file mode 100644 index 0000000..10a913e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.testclassification.MetricsTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; + +@Category({MetricsTests.class, SmallTests.class}) +public class TestMetricsConnection { + + private static MetricsConnection METRICS; + + @BeforeClass + public static void beforeClass() { + HConnectionImplementation mocked = Mockito.mock(HConnectionImplementation.class); + Mockito.when(mocked.toString()).thenReturn("mocked-connection"); + METRICS = new MetricsConnection(Mockito.mock(HConnectionImplementation.class)); + } + + @AfterClass + public static void afterClass() { + METRICS.shutdown(); + } + + @Test + public void testStaticMetrics() throws IOException { + final byte[] foo = Bytes.toBytes("foo"); + final RegionSpecifier region = RegionSpecifier.newBuilder() + .setValue(ByteString.EMPTY) + .setType(RegionSpecifierType.REGION_NAME) + .build(); + final int loop = 5; + + for (int i = 0; i < loop; i++) { + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Get"), + GetRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Scan"), + ScanRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Multi"), + MultiRequest.getDefaultInstance(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + METRICS.updateRpc( + ClientService.getDescriptor().findMethodByName("Mutate"), + MutateRequest.newBuilder() + .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))) + .setRegion(region) + .build(), + MetricsConnection.newCallStats()); + } + for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { + METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, + METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker + }) { + Assert.assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.count()); + Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count()); + Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7bd13dd..0b21a2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -842,7 +842,7 @@ public class HRegionServer extends HasThread implements // Setup RPC client for master communication rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( - rpcServices.isa.getAddress(), 0)); + rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); boolean onlyMetaRefresh = false; int storefileRefreshPeriod = conf.getInt( http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index b2cb772..91f494a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; @@ -48,7 +49,7 @@ public class MetricsRegionServer { this.serverSource = serverSource; } - // for unit-test usage + @VisibleForTesting public MetricsRegionServerSource getMetricsSource() { return serverSource; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 4883ef4..4a3b776 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -129,9 +129,10 @@ public class TestClientTimeouts { /** * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel */ - public static class RandomTimeoutRpcClient extends RpcClientImpl{ - public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { - super(conf, clusterId, localAddr); + public static class RandomTimeoutRpcClient extends RpcClientImpl { + public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + super(conf, clusterId, localAddr, metrics); } // Return my own instance, one that does random timeouts http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 32eb9f6..d427419 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -163,7 +164,8 @@ public abstract class AbstractTestIPC { final String message = "hello"; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); Pair r = - client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); assertTrue(r.getSecond() == null); // Silly assertion that the message is in the returned pb. assertTrue(r.getFirst().toString().contains(message)); @@ -205,7 +207,8 @@ public abstract class AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); Pair r = - client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address); + client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, + new MetricsConnection.CallStats()); int index = 0; while (r.getSecond().advance()) { assertTrue(CELL.equals(r.getSecond().current())); @@ -231,7 +234,8 @@ public abstract class AbstractTestIPC { InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - client.call(null, md, param, null, User.getCurrent(), address); + client.call(null, md, param, null, User.getCurrent(), address, + new MetricsConnection.CallStats()); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -255,10 +259,10 @@ public abstract class AbstractTestIPC { MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); for (int i = 0; i < 10; i++) { - client.call( - new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, md - .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress()); + client.call(new PayloadCarryingRpcController( + CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, + md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), + new MetricsConnection.CallStats()); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index ca7c9a7..8921e89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -115,7 +116,7 @@ public class TestAsyncIPC extends AbstractTestIPC { @Override protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { + return new AsyncRpcClient(conf) { @Override Codec getCodec() { @@ -128,15 +129,13 @@ public class TestAsyncIPC extends AbstractTestIPC { @Override protected AsyncRpcClient createRpcClient(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + return new AsyncRpcClient(conf); } @Override protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { setConf(conf); - return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null, - new ChannelInitializer() { - + return new AsyncRpcClient(conf, new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { @@ -247,7 +246,7 @@ public class TestAsyncIPC extends AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client = new AsyncRpcClient(conf); KeyValue kv = BIG_CELL; Put p = new Put(CellUtil.cloneRow(kv)); for (int i = 0; i < cellcount; i++) { @@ -281,7 +280,8 @@ public class TestAsyncIPC extends AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); // Pair response = - client.call(pcrc, md, builder.build(), param, user, address); + client.call(pcrc, md, builder.build(), param, user, address, + new MetricsConnection.CallStats()); /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java index 9476fcf..e294830 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java @@ -36,15 +36,15 @@ public class TestGlobalEventLoopGroup { public void test() { Configuration conf = HBaseConfiguration.create(); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true); - AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client = new AsyncRpcClient(conf); assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP); - AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client1 = new AsyncRpcClient(conf); assertSame(client.bootstrap.group(), client1.bootstrap.group()); client1.close(); assertFalse(client.bootstrap.group().isShuttingDown()); conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false); - AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null); + AsyncRpcClient client2 = new AsyncRpcClient(conf); assertNotSame(client.bootstrap.group(), client2.bootstrap.group()); client2.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index af10058..227f91a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; @@ -148,7 +149,8 @@ public class TestIPC extends AbstractTestIPC { PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); // Pair response = - client.call(pcrc, md, builder.build(), param, user, address); + client.call(pcrc, md, builder.build(), param, user, address, + new MetricsConnection.CallStats()); /* * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), * count); http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index 2965055..596b8ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.codec.Codec; @@ -58,8 +59,9 @@ public class TestRpcClientLeaks { super(conf, clusterId); } - public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address) { - super(conf, clusterId, address); + public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, + MetricsConnection metrics) { + super(conf, clusterId, address, metrics); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a048c32a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 82b1a1b..dbf119d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -180,7 +181,7 @@ public class TestRpcHandlerException { new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - rpcServer.getListenerAddress()); + rpcServer.getListenerAddress(), new MetricsConnection.CallStats()); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally {