Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B44C200CC2 for ; Wed, 5 Jul 2017 22:59:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 28BBA16478D; Wed, 5 Jul 2017 20:59:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F165C164790 for ; Wed, 5 Jul 2017 22:59:34 +0200 (CEST) Received: (qmail 83154 invoked by uid 500); 5 Jul 2017 20:59:34 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 83145 invoked by uid 99); 5 Jul 2017 20:59:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Jul 2017 20:59:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3EA22DFF8A; Wed, 5 Jul 2017 20:59:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: alexey@apache.org To: commits@kudu.apache.org Date: Wed, 05 Jul 2017 20:59:34 -0000 Message-Id: <8ea2ad55585b4b1e955f3c236c8dc648@git.apache.org> In-Reply-To: <6005855a2c134225be93a8acfe9fb8c4@git.apache.org> References: <6005855a2c134225be93a8acfe9fb8c4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] kudu git commit: [java] separating Connection archived-at: Wed, 05 Jul 2017 20:59:37 -0000 [java] separating Connection This patch separates lower-level, connection-related functionality from the TabletClient class into the new Connection class. The updated TabletClient has been renamed into RpcProxy. Also, this patch contains other micro-updates on the related code. In addition, this patch addresses KUDU-1878. This work is done in the context of KUDU-2013. Change-Id: Id4ac81d9454631e7501c31576c24f85e968bb871 Reviewed-on: http://gerrit.cloudera.org:8080/7146 Tested-by: Alexey Serbin Reviewed-by: Alexey Serbin Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58248841 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58248841 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58248841 Branch: refs/heads/master Commit: 58248841f213a64683ee217f025f0a38a8450f74 Parents: 7e74527 Author: Alexey Serbin Authored: Thu Jun 1 18:39:13 2017 -0700 Committer: Alexey Serbin Committed: Wed Jul 5 20:58:46 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 277 +++---- .../apache/kudu/client/ConnectToCluster.java | 112 ++- .../java/org/apache/kudu/client/Connection.java | 634 +++++++++++++++ .../org/apache/kudu/client/ConnectionCache.java | 269 ++----- .../java/org/apache/kudu/client/KuduRpc.java | 6 +- .../java/org/apache/kudu/client/Negotiator.java | 5 +- .../org/apache/kudu/client/RemoteTablet.java | 53 +- .../java/org/apache/kudu/client/RpcProxy.java | 406 ++++++++++ .../java/org/apache/kudu/client/ServerInfo.java | 4 +- .../org/apache/kudu/client/TabletClient.java | 779 ------------------- .../org/apache/kudu/client/BaseKuduTest.java | 6 +- .../java/org/apache/kudu/client/ITClient.java | 7 +- .../kudu/client/ITFaultTolerantScanner.java | 11 +- .../kudu/client/ITNonFaultTolerantScanner.java | 4 +- .../kudu/client/ITScannerMultiTablet.java | 21 +- .../org/apache/kudu/client/MiniKuduCluster.java | 73 +- .../apache/kudu/client/TestAsyncKuduClient.java | 16 +- .../kudu/client/TestAsyncKuduSession.java | 13 +- .../apache/kudu/client/TestConnectionCache.java | 109 ++- .../apache/kudu/client/TestRemoteTablet.java | 31 +- .../org/apache/kudu/client/TestTimeouts.java | 1 + 21 files changed, 1504 insertions(+), 1333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 019a3f5..a304d05 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -27,8 +27,10 @@ package org.apache.kudu.client; import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED; +import java.net.InetAddress; import java.net.UnknownHostException; import java.security.cert.CertificateException; import java.util.ArrayList; @@ -43,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.security.auth.Subject; @@ -67,6 +71,7 @@ import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kudu.Common; import org.apache.kudu.Schema; import org.apache.kudu.master.Master; import org.apache.kudu.master.Master.GetTableLocationsResponsePB; @@ -126,7 +131,8 @@ public class AsyncKuduClient implements AutoCloseable { * a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with * stale information. */ - static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10; + private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10; + /** * The number of tablets to fetch from the master when looking up a range of * tablets. @@ -142,16 +148,18 @@ public class AsyncKuduClient implements AutoCloseable { private final ConcurrentHashMap tableLocations = new ConcurrentHashMap<>(); + /** A cache to keep track of already opened connections to Kudu servers. */ private final ConnectionCache connectionCache; @GuardedBy("sessions") private final Set sessions = new HashSet<>(); - // Since the masters also go through TabletClient, we need to treat them as if they were a normal - // table. We'll use the following fake table name to identify places where we need special + // Since RPCs to the masters also go through RpcProxy, we need to treat them as if they were a + // normal table. We'll use the following fake table name to identify places where we need special // handling. + // TODO(aserbin) clean this up static final String MASTER_TABLE_NAME_PLACEHOLDER = "Kudu Master"; - final KuduTable masterTable; + private final KuduTable masterTable; private final List masterAddresses; private final HashedWheelTimer timer; @@ -212,7 +220,40 @@ public class AsyncKuduClient implements AutoCloseable { this.timer = b.timer; String clientId = UUID.randomUUID().toString().replace("-", ""); this.requestTracker = new RequestTracker(clientId); - this.connectionCache = new ConnectionCache(this); + this.connectionCache = new ConnectionCache( + securityContext, defaultSocketReadTimeoutMs, timer, channelFactory); + } + + /** + * Get a proxy to send RPC calls to the specified server. + * + * @param serverInfo server's information + * @return the proxy object bound to the target server + */ + @Nonnull + RpcProxy newRpcProxy(final ServerInfo serverInfo) { + Preconditions.checkNotNull(serverInfo); + return new RpcProxy(this, connectionCache.getConnection(serverInfo)); + } + + /** + * Get a proxy to send RPC calls to Kudu master at the specified end-point. + * + * @param hostPort master end-point + * @return the proxy object bound to the target master + */ + @Nullable + RpcProxy newMasterRpcProxy(HostAndPort hostPort) { + // We should have a UUID to construct ServerInfo for the master, but we have a chicken + // and egg problem, we first need to communicate with the masters to find out about them, + // and that's what we're trying to do. The UUID is just used for logging and cache key, + // so instead we just use concatenation of master host and port, prefixed with "master-". + final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); + if (inetAddress == null) { + // TODO(todd): should we log the resolution failure? throw an exception? + return null; + } + return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress)); } /** @@ -374,7 +415,7 @@ public class AsyncKuduClient implements AutoCloseable { return sendRpcToTablet(rpc); } - Deferred getTableSchema(String name) { + private Deferred getTableSchema(String name) { GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name); rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs); return sendRpcToTablet(rpc); @@ -501,9 +542,9 @@ public class AsyncKuduClient implements AutoCloseable { } /** - * This callback will be repeatadly used when opening a table until it is done being created. + * This callback will be repeatedly used when opening a table until it is done being created. */ - Callback, Master.IsCreateTableDoneResponsePB> getOpenTableCB( + private Callback, Master.IsCreateTableDoneResponsePB> getOpenTableCB( final KuduRpc rpc, final KuduTable table) { return new Callback, Master.IsCreateTableDoneResponsePB>() { @Override @@ -635,18 +676,6 @@ public class AsyncKuduClient implements AutoCloseable { return requestTracker; } - HashedWheelTimer getTimer() { - return timer; - } - - ClientSocketChannelFactory getChannelFactory() { - return channelFactory; - } - - SecurityContext getSecurityContext() { - return securityContext; - } - /** * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table. * @param table the name of the table you intend to scan. @@ -691,23 +720,20 @@ public class AsyncKuduClient implements AutoCloseable { */ Deferred scanNextRows(final AsyncKuduScanner scanner) { RemoteTablet tablet = scanner.currentTablet(); - assert (tablet != null); + Preconditions.checkNotNull(tablet); KuduRpc nextRequest = scanner.getNextRowsRequest(); - String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()); - TabletClient client = connectionCache.getClient(uuid); // Important to increment the attempts before the next if statement since // getSleepTimeForRpc() relies on it if the client is null or dead. nextRequest.attempt++; - if (client == null || !client.isAlive()) { - // A null client means we either don't know about this tablet anymore (unlikely) or we - // couldn't find a leader (which could be triggered by a read timeout). - // We'll first delay the RPC in case things take some time to settle down, then retry. - Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid + - " will retry after a delay"); - return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError)); + final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection()); + if (info == null) { + return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError( + String.format("No information on servers hosting tablet %s, will retry later", + tablet.getTabletId())))); } + Deferred d = nextRequest.getDeferred(); - client.sendRpc(nextRequest); + RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest); return d; } @@ -723,55 +749,19 @@ public class AsyncKuduClient implements AutoCloseable { if (tablet == null) { return Deferred.fromResult(null); } - - final KuduRpc closeRequest = scanner.getCloseRequest(); - final TabletClient client = connectionCache.getClient( - tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection())); - if (client == null || !client.isAlive()) { - // Oops, we couldn't find a tablet server that hosts this tablet. Our - // cache was probably invalidated while the client was scanning. So - // we can't close this scanner properly. - LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet); + final KuduRpc closeRequest = scanner.getCloseRequest(); + final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection()); + if (info == null) { return Deferred.fromResult(null); } final Deferred d = closeRequest.getDeferred(); closeRequest.attempt++; - client.sendRpc(closeRequest); + RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest); return d; } /** - * Forcefully shuts down the RemoteTablet connection and - * fails all outstanding RPCs. - * - * @param tablet the given tablet - * @param replicaSelection replica selection mechanism to use - */ - @VisibleForTesting - void shutdownConnection(RemoteTablet tablet, - ReplicaSelection replicaSelection) { - TabletClient client = connectionCache.getClient( - tablet.getReplicaSelectedUUID(replicaSelection)); - client.shutdown(); - } - - /** - * Forcefully disconnects the RemoteTablet connection and - * fails all outstanding RPCs. - * - * @param tablet the given tablet - * @param replicaSelection replica selection mechanism to use - */ - @VisibleForTesting - void disconnect(RemoteTablet tablet, - ReplicaSelection replicaSelection) { - TabletClient client = connectionCache.getClient( - tablet.getReplicaSelectedUUID(replicaSelection)); - client.disconnect(); - } - - /** * Sends the provided {@link KuduRpc} to the tablet server hosting the leader * of the tablet identified by the RPC's table and partition key. * @@ -812,15 +802,12 @@ public class AsyncKuduClient implements AutoCloseable { // If we found a tablet, we'll try to find the TS to talk to. if (entry != null) { RemoteTablet tablet = entry.getTablet(); - String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection()); - if (uuid != null) { + ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection()); + if (info != null) { Deferred d = request.getDeferred(); request.setTablet(tablet); - TabletClient client = connectionCache.getLiveClient(uuid); - if (client != null) { - client.sendRpc(request); - return d; - } + RpcProxy.sendRpc(this, connectionCache.getConnection(info), request); + return d; } } @@ -923,7 +910,8 @@ public class AsyncKuduClient implements AutoCloseable { * @param Request's return type. * @return An errback. */ - Callback getDelayedIsCreateTableDoneErrback(final KuduRpc request) { + private Callback getDelayedIsCreateTableDoneErrback( + final KuduRpc request) { return new Callback() { @Override public Exception call(Exception e) throws Exception { @@ -944,26 +932,26 @@ public class AsyncKuduClient implements AutoCloseable { * @param errback the errback to call if something goes wrong when calling IsCreateTableDone * @return Deferred used to track the provided KuduRpc */ - Deferred delayedIsCreateTableDone(final KuduTable table, final KuduRpc rpc, - final Callback, - Master.IsCreateTableDoneResponsePB> retryCB, - final Callback errback) { + private Deferred delayedIsCreateTableDone( + final KuduTable table, + final KuduRpc rpc, + final Callback, + Master.IsCreateTableDoneResponsePB> retryCB, + final Callback errback) { final class RetryTimer implements TimerTask { public void run(final Timeout timeout) { String tableId = table.getTableId(); final boolean has_permit = acquireMasterLookupPermit(); - if (!has_permit) { + if (!has_permit && !tablesNotServed.contains(tableId)) { // If we failed to acquire a permit, it's worth checking if someone // looked up the tablet we're interested in. Every once in a while // this will save us a Master lookup. - if (!tablesNotServed.contains(tableId)) { - try { - retryCB.call(null); - return; - } catch (Exception e) { - // we're calling RetryRpcCB which doesn't throw exceptions, ignore - } + try { + retryCB.call(null); + return; + } catch (Exception e) { + // we're calling RetryRpcCB which doesn't throw exceptions, ignore } } IsCreateTableDoneRequest isCreateTableDoneRequest = @@ -1027,11 +1015,11 @@ public class AsyncKuduClient implements AutoCloseable { } } - long getSleepTimeForRpc(KuduRpc rpc) { - byte attemptCount = rpc.attempt; + private long getSleepTimeForRpc(KuduRpc rpc) { + int attemptCount = rpc.attempt; assert (attemptCount > 0); if (attemptCount == 0) { - LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc, + LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: {}", rpc, new Exception("Exception created to collect stack trace")); attemptCount = 1; } @@ -1039,29 +1027,12 @@ public class AsyncKuduClient implements AutoCloseable { long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) * sleepRandomizer.nextDouble()); if (LOG.isTraceEnabled()) { - LOG.trace("Going to sleep for " + sleepTime + " at retry " + rpc.attempt); + LOG.trace("Going to sleep for {} at retry {}", sleepTime, rpc.attempt); } return sleepTime; } /** - * Modifying the list returned by this method won't change how AsyncKuduClient behaves, - * but calling certain methods on the returned TabletClients can. For example, - * it's possible to forcefully shutdown a connection to a tablet server by calling {@link - * TabletClient#shutdown()}. - * @return copy of the current TabletClients list - */ - @VisibleForTesting - List getTabletClients() { - return connectionCache.getImmutableTabletClientsList(); - } - - @VisibleForTesting - TabletClient getTabletClient(String uuid) { - return connectionCache.getClient(uuid); - } - - /** * Clears {@link #tableLocations} of the table's entries. * * This method makes the maps momentarily inconsistent, and should only be @@ -1080,7 +1051,7 @@ public class AsyncKuduClient implements AutoCloseable { * @return {@code true} if this RPC already had too many attempts, * {@code false} otherwise (in which case it's OK to retry once more) */ - static boolean cannotRetryRequest(final KuduRpc rpc) { + private static boolean cannotRetryRequest(final KuduRpc rpc) { return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS; } @@ -1091,8 +1062,8 @@ public class AsyncKuduClient implements AutoCloseable { * @param cause What was cause of the last failed attempt, if known. * You can pass {@code null} if the cause is unknown. */ - static Deferred tooManyAttemptsOrTimeout(final KuduRpc request, - final KuduException cause) { + private static Deferred tooManyAttemptsOrTimeout(final KuduRpc request, + final KuduException cause) { String message; if (request.attempt > MAX_RPC_ATTEMPTS) { message = "Too many attempts: "; @@ -1127,7 +1098,7 @@ public class AsyncKuduClient implements AutoCloseable { // this will save us a Master lookup. TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey); if (entry != null && !entry.isNonCoveredRange() && - entry.getTablet().getLeaderUUID() != null) { + entry.getTablet().getLeaderServerInfo() != null) { return Deferred.fromResult(null); // Looks like no lookup needed. } } @@ -1164,9 +1135,8 @@ public class AsyncKuduClient implements AutoCloseable { */ Deferred getMasterTableLocationsPB(KuduRpc parentRpc) { // TODO(todd): stop using this 'masterTable' hack. - return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache, - defaultAdminOperationTimeoutMs) - .addCallback( + return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, + defaultAdminOperationTimeoutMs).addCallback( new Callback() { @Override public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) { @@ -1306,8 +1276,8 @@ public class AsyncKuduClient implements AutoCloseable { * We're handling a tablet server that's telling us it doesn't have the tablet we're asking for. * We're in the context of decode() meaning we need to either callback or retry later. */ - void handleTabletNotFound(final KuduRpc rpc, KuduException ex, TabletClient server) { - invalidateTabletCache(rpc.getTablet(), server); + void handleTabletNotFound(final KuduRpc rpc, KuduException ex, ServerInfo info) { + invalidateTabletCache(rpc.getTablet(), info); handleRetryableError(rpc, ex); } @@ -1315,8 +1285,8 @@ public class AsyncKuduClient implements AutoCloseable { * A tablet server is letting us know that it isn't the specified tablet's leader in response * a RPC, so we need to demote it and retry. */ - void handleNotLeader(final KuduRpc rpc, KuduException ex, TabletClient server) { - rpc.getTablet().demoteLeader(server.getServerInfo().getUuid()); + void handleNotLeader(final KuduRpc rpc, KuduException ex, ServerInfo info) { + rpc.getTablet().demoteLeader(info.getUuid()); handleRetryableError(rpc, ex); } @@ -1370,12 +1340,40 @@ public class AsyncKuduClient implements AutoCloseable { * Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing * the tablet itself from the caches. */ - private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) { - String uuid = server.getServerInfo().getUuid(); + private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info) { + final String uuid = info.getUuid(); LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId()); tablet.removeTabletClient(uuid); } + /** + * Translate master-provided information {@link Master.TSInfoPB} on a tablet server into internal + * {@link ServerInfo} representation. + * + * @param tsInfoPB master-provided information for the tablet server + * @return an object that contains all the server's information + * @throws UnknownHostException if we cannot resolve the tablet server's IP address + */ + private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException { + final List addresses = tsInfoPB.getRpcAddressesList(); + final String uuid = tsInfoPB.getPermanentUuid().toStringUtf8(); + if (addresses.isEmpty()) { + LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid); + return null; + } + + // from meta_cache.cc + // TODO: if the TS advertises multiple host/ports, pick the right one + // based on some kind of policy. For now just use the first always. + final HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0)); + final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); + if (inetAddress == null) { + throw new UnknownHostException( + "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'"); + } + return new ServerInfo(uuid, hostPort, inetAddress); + } + /** Callback executed when a master lookup completes. */ private final class MasterLookupCB implements Callback { @@ -1418,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable { } } - boolean acquireMasterLookupPermit() { + private boolean acquireMasterLookupPermit() { try { // With such a low timeout, the JVM may chose to spin-wait instead of // de-scheduling the thread (and causing context switches and whatnot). @@ -1433,7 +1431,7 @@ public class AsyncKuduClient implements AutoCloseable { * Releases a master lookup permit that was acquired. * @see #acquireMasterLookupPermit */ - void releaseMasterLookupPermit() { + private void releaseMasterLookupPermit() { masterLookups.release(); } @@ -1477,7 +1475,7 @@ public class AsyncKuduClient implements AutoCloseable { List servers = new ArrayList<>(tabletPb.getReplicasCount()); for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { try { - ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo()); + ServerInfo serverInfo = resolveTS(replica.getTsInfo()); if (serverInfo != null) { servers.add(serverInfo); } @@ -1508,7 +1506,7 @@ public class AsyncKuduClient implements AutoCloseable { // right away. If not, we throw an exception that RetryRpcErrback will understand as needing to // sleep before retrying. TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey); - if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) { + if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) { throw new NoLeaderFoundException( Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader")); } @@ -1588,8 +1586,9 @@ public class AsyncKuduClient implements AutoCloseable { } /** - * Invokes {@link #shutdown()} and waits. This method returns - * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs. + * Invokes {@link #shutdown()} and waits. This method returns void, so consider invoking + * {@link #shutdown()} directly if there's a need to handle dangling RPCs. + * * @throws Exception if an error happens while closing the connections */ @Override @@ -1607,7 +1606,8 @@ public class AsyncKuduClient implements AutoCloseable { *
  • Releases all other resources.
  • * * Not calling this method before losing the last reference to this - * instance may result in data loss and other unwanted side effects + * instance may result in data loss and other unwanted side effects. + * * @return A {@link Deferred}, whose callback chain will be invoked once all * of the above have been done. If this callback chain doesn't fail, then * the clean shutdown will be successful, and all the data will be safe on @@ -1628,6 +1628,7 @@ public class AsyncKuduClient implements AutoCloseable { super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown"); } + @Override public void run() { // This terminates the Executor. channelFactory.releaseExternalResources(); @@ -1676,7 +1677,7 @@ public class AsyncKuduClient implements AutoCloseable { // concurrent modification during the iteration. Set copyOfSessions; synchronized (sessions) { - copyOfSessions = new HashSet(sessions); + copyOfSessions = new HashSet<>(sessions); } if (sessions.isEmpty()) { return Deferred.fromResult(null); @@ -1690,7 +1691,7 @@ public class AsyncKuduClient implements AutoCloseable { return Deferred.group(deferreds); } - private boolean isMasterTable(String tableId) { + private static boolean isMasterTable(String tableId) { // Checking that it's the same instance so there's absolutely no chance of confusing the master // 'table' for a user one. return MASTER_TABLE_NAME_PLACEHOLDER == tableId; @@ -1709,6 +1710,14 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * @return copy of the current TabletClients list + */ + @VisibleForTesting + List getConnectionListCopy() { + return connectionCache.getConnectionListCopy(); + } + + /** * Builder class to use in order to connect to Kudu. * All the parameters beyond those in the constructors are optional. */ http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java index 22b31f0..dfba55d 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Functions; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; import com.stumbleupon.async.Callback; @@ -80,7 +81,8 @@ final class ConnectToCluster { private static Deferred connectToMaster( final KuduTable masterTable, - final TabletClient masterClient, KuduRpc parentRpc, + final RpcProxy masterProxy, + KuduRpc parentRpc, long defaultTimeoutMs) { // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters, // basically reuse in some way the master permits. @@ -93,7 +95,7 @@ final class ConnectToCluster { } Deferred d = rpc.getDeferred(); rpc.attempt++; - masterClient.sendRpc(rpc); + masterProxy.sendRpc(rpc); // If we are connecting to an older version of Kudu, we'll get an invalid request // error. In that case, we resend using the older version of the RPC. @@ -107,10 +109,10 @@ final class ConnectToCluster { rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) { AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " + "to server running Kudu < 1.3."); - Deferred newAttempt = rpc.getDeferred(); - assert newAttempt != null; + final Deferred newAttempt = + Preconditions.checkNotNull(rpc.getDeferred()); rpc.setUseOldMethod(); - masterClient.sendRpc(rpc); + masterProxy.sendRpc(rpc); return newAttempt; } } @@ -128,8 +130,6 @@ final class ConnectToCluster { * @param masterTable the "placeholder" table used by AsyncKuduClient * @param masterAddresses the addresses of masters to fetch from * @param parentRpc RPC that prompted a master lookup, can be null - * @param connCache the client's connection cache, used for creating connections - * to masters * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout * @return a Deferred object for the cluster connection status */ @@ -137,7 +137,6 @@ final class ConnectToCluster { KuduTable masterTable, List masterAddresses, KuduRpc parentRpc, - ConnectionCache connCache, long defaultTimeoutMs) { ConnectToCluster connector = new ConnectToCluster(masterAddresses); @@ -146,14 +145,14 @@ final class ConnectToCluster { // deferred. for (HostAndPort hostAndPort : masterAddresses) { Deferred d; - TabletClient client = connCache.newMasterClient(hostAndPort); - if (client == null) { + RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort); + if (proxy != null) { + d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs); + } else { String message = "Couldn't resolve this master's address " + hostAndPort.toString(); LOG.warn(message); Status statusIOE = Status.IOError(message); d = Deferred.fromError(new NonRecoverableException(statusIOE)); - } else { - d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs); } d.addCallbacks(connector.callbackForNode(hostAndPort), connector.errbackForNode(hostAndPort)); @@ -192,56 +191,50 @@ final class ConnectToCluster { * to responseD. */ private void incrementCountAndCheckExhausted() { - if (countResponsesReceived.incrementAndGet() == numMasters) { - if (responseDCalled.compareAndSet(false, true)) { - - // We want `allUnrecoverable` to only be true if all the masters came back with - // NonRecoverableException so that we know for sure we can't retry anymore. Just one master - // that replies with RecoverableException or with an ok response but is a FOLLOWER is - // enough to keep us retrying. - boolean allUnrecoverable = true; - if (exceptionsReceived.size() == countResponsesReceived.get()) { - for (Exception ex : exceptionsReceived) { - if (!(ex instanceof NonRecoverableException)) { - allUnrecoverable = false; - break; - } + if (countResponsesReceived.incrementAndGet() == numMasters && + responseDCalled.compareAndSet(false, true)) { + // We want `allUnrecoverable` to only be true if all the masters came back with + // NonRecoverableException so that we know for sure we can't retry anymore. Just one master + // that replies with RecoverableException or with an ok response but is a FOLLOWER is + // enough to keep us retrying. + boolean allUnrecoverable = true; + if (exceptionsReceived.size() == countResponsesReceived.get()) { + for (Exception ex : exceptionsReceived) { + if (!(ex instanceof NonRecoverableException)) { + allUnrecoverable = false; + break; } - } else { - allUnrecoverable = false; } + } else { + allUnrecoverable = false; + } - String allHosts = NetUtil.hostsAndPortsToString(masterAddrs); - if (allUnrecoverable) { - // This will stop retries. - String msg = String.format("Couldn't find a valid master in (%s). " + - "Exceptions received: %s", allHosts, - Joiner.on(",").join(Lists.transform( - exceptionsReceived, Functions.toStringFunction()))); - Status s = Status.ServiceUnavailable(msg); - responseD.callback(new NonRecoverableException(s)); + String allHosts = NetUtil.hostsAndPortsToString(masterAddrs); + if (allUnrecoverable) { + // This will stop retries. + String msg = String.format("Couldn't find a valid master in (%s). " + + "Exceptions received: %s", allHosts, + Joiner.on(",").join(Lists.transform( + exceptionsReceived, Functions.toStringFunction()))); + Status s = Status.ServiceUnavailable(msg); + responseD.callback(new NonRecoverableException(s)); + } else { + String message = String.format("Master config (%s) has no leader.", + allHosts); + Exception ex; + if (exceptionsReceived.isEmpty()) { + LOG.warn("None of the provided masters {} is a leader; will retry", allHosts); + ex = new NoLeaderFoundException(Status.ServiceUnavailable(message)); } else { - String message = String.format("Master config (%s) has no leader.", - allHosts); - Exception ex; - if (exceptionsReceived.isEmpty()) { - LOG.warn(String.format( - "None of the provided masters (%s) is a leader, will retry.", - allHosts)); - ex = new NoLeaderFoundException(Status.ServiceUnavailable(message)); - } else { - LOG.warn(String.format( - "Unable to find the leader master (%s), will retry", - allHosts)); - String joinedMsg = message + " Exceptions received: " + - Joiner.on(",").join(Lists.transform( - exceptionsReceived, Functions.toStringFunction())); - Status s = Status.ServiceUnavailable(joinedMsg); - ex = new NoLeaderFoundException(s, - exceptionsReceived.get(exceptionsReceived.size() - 1)); - } - responseD.callback(ex); + LOG.warn("Unable to find the leader master {}; will retry", allHosts); + String joinedMsg = message + " Exceptions received: " + + Joiner.on(",").join(Lists.transform( + exceptionsReceived, Functions.toStringFunction())); + Status s = Status.ServiceUnavailable(joinedMsg); + ex = new NoLeaderFoundException(s, + exceptionsReceived.get(exceptionsReceived.size() - 1)); } + responseD.callback(ex); } } } @@ -273,8 +266,7 @@ final class ConnectToCluster { // Someone else already found a leader. This is somewhat unexpected // because this means two nodes think they're the leader, but it's // not impossible. We'll just ignore it. - LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " + - hostAndPort.toString()); + LOG.debug("Callback already invoked, discarding response({}) from {}", r, hostAndPort); return null; } @@ -304,7 +296,7 @@ final class ConnectToCluster { @Override public Void call(Exception e) throws Exception { - LOG.warn("Error receiving a response from: " + hostAndPort, e); + LOG.warn("Error receiving response from {}", hostAndPort, e); exceptionsReceived.add(e); incrementCountAndCheckExhausted(); return null; http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java new file mode 100644 index 0000000..ea1e113 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java @@ -0,0 +1,634 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.client; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.net.ssl.SSLException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DefaultChannelPipeline; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.SocketChannel; +import org.jboss.netty.channel.socket.SocketChannelConfig; +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; +import org.jboss.netty.handler.timeout.ReadTimeoutException; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.client.Negotiator.Result; +import org.apache.kudu.rpc.RpcHeader; +import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag; + +/** + * Class representing a connection from the client to a Kudu server (master or tablet server): + * a high-level wrapper for the TCP connection between the client and the server. + *

    + * It's a stateful handler that manages a connection to a Kudu server. + *

    + * This handler manages the RPC IDs, and keeps track of the RPCs in flight for which + * a response is currently awaited, as well as temporarily buffered RPCs that are waiting + * to be sent to the server. + *

    + * Acquiring the monitor on an object of this class will prevent it from + * accepting write requests as well as buffering requests if the underlying + * channel isn't connected. + * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class Connection extends SimpleChannelUpstreamHandler { + /** Information on the target server. */ + private final ServerInfo serverInfo; + + /** Security context to use for connection negotiation. */ + private final SecurityContext securityContext; + + /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */ + private final long socketReadTimeoutMs; + + /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */ + private final HashedWheelTimer timer; + + /** The underlying Netty's socket channel. */ + private final SocketChannel channel; + + /** + * Set to true when disconnect initiated explicitly from the client side. The channelDisconnected + * event handler then knows not to log any warning about unexpected disconnection from the peer. + */ + private volatile boolean explicitlyDisconnected = false; + + /** Logger: a sink for the log messages originated from this class. */ + private static final Logger LOG = LoggerFactory.getLogger(Connection.class); + + private static final byte RPC_CURRENT_VERSION = 9; + + /** Initial header sent by the client upon connection establishment. */ + private static final byte[] CONNECTION_HEADER = new byte[]{'h', 'r', 'p', 'c', + RPC_CURRENT_VERSION, // RPC version. + 0, + 0 + }; + + /** Lock to guard access to some of the fields below. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** A state of this object. */ + @GuardedBy("lock") + private State state; + + /** + * A hash table to store { callId, statusReportCallback } pairs, representing messages which have + * already been sent and pending responses from the server side. Once the server responds to a + * message, the corresponding entry is removed from the container and the response callback + * is invoked with the results represented by {@link CallResponseInfo}. + */ + @GuardedBy("lock") + private HashMap> inflightMessages = new HashMap<>(); + + /** Messages enqueued while the connection was not ready to start sending them over the wire. */ + @GuardedBy("lock") + private ArrayList queuedMessages = Lists.newArrayList(); + + /** The result of the connection negotiation. */ + @GuardedBy("lock") + private Result negotiationResult = null; + + /** A monotonically increasing counter for RPC IDs. */ + @GuardedBy("lock") + private int nextCallId = 0; + + Connection(ServerInfo serverInfo, + SecurityContext securityContext, + long socketReadTimeoutMs, + HashedWheelTimer timer, + ClientSocketChannelFactory channelFactory) { + this.serverInfo = serverInfo; + this.securityContext = securityContext; + this.state = State.NEW; + this.socketReadTimeoutMs = socketReadTimeoutMs; + this.timer = timer; + + final ConnectionPipeline pipeline = new ConnectionPipeline(); + pipeline.init(); + + channel = channelFactory.newChannel(pipeline); + SocketChannelConfig config = channel.getConfig(); + config.setConnectTimeoutMillis(60000); + config.setTcpNoDelay(true); + // Unfortunately there is no way to override the keep-alive timeout in + // Java since the JRE doesn't expose any way to call setsockopt() with + // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. + config.setKeepAlive(true); + } + + /** {@inheritDoc} */ + @Override + public void channelConnected(final ChannelHandlerContext ctx, + final ChannelStateEvent e) { + lock.lock(); + try { + Preconditions.checkState(state == State.CONNECTING); + state = State.NEGOTIATING; + } finally { + lock.unlock(); + } + Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER)); + Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext); + ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator); + negotiator.sendHello(channel); + } + + /** {@inheritDoc} */ + @Override + public void handleUpstream(final ChannelHandlerContext ctx, + final ChannelEvent e) throws Exception { + if (LOG.isTraceEnabled()) { + LOG.trace("{} upstream event {}", getLogPrefix(), e); + } + super.handleUpstream(ctx, e); + } + + /** {@inheritDoc} */ + @Override + public void channelDisconnected(final ChannelHandlerContext ctx, + final ChannelStateEvent e) throws Exception { + // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream + // pipeline after Connection itself. So, just handle the disconnection event ourselves. + cleanup("connection disconnected"); + } + + /** {@inheritDoc} */ + @Override + public void channelClosed(final ChannelHandlerContext ctx, + final ChannelStateEvent e) throws Exception { + // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream + // pipeline after Connection itself. So, just handle the close event ourselves. + cleanup("connection closed"); + } + + /** {@inheritDoc} */ + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception { + Object m = evt.getMessage(); + if (m instanceof Negotiator.Result) { + lock.lock(); + try { + Preconditions.checkState(state == State.NEGOTIATING); + Preconditions.checkNotNull(queuedMessages); + + state = State.READY; + negotiationResult = (Negotiator.Result) m; + List queued = queuedMessages; + // The queuedMessages should not be used anymore once the connection is negotiated. + queuedMessages = null; + // Send out all the enqueued messages. This is done while holding the lock to preserve + // the sequence of the already enqueued and being-enqueued-right-now messages and simplify + // the logic which checks for the consistency using Preconditions.checkXxx() methods. + for (final QueuedMessage qm : queued) { + sendCallToWire(qm.message, qm.cb); + } + } finally { + lock.unlock(); + } + return; + } + + // Some other event which the connection does not handle. + if (!(m instanceof CallResponse)) { + ctx.sendUpstream(evt); + return; + } + + final CallResponse response = (CallResponse) m; + final RpcHeader.ResponseHeader header = response.getHeader(); + if (!header.hasCallId()) { + final int size = response.getTotalResponseSize(); + final String msg = getLogPrefix() + + " RPC response (size: " + size + ") doesn't" + " have callID: " + header; + LOG.error(msg); + throw new NonRecoverableException(Status.Incomplete(msg)); + } + + final int callId = header.getCallId(); + Callback responseCbk; + lock.lock(); + try { + Preconditions.checkState(state == State.READY); + responseCbk = inflightMessages.remove(callId); + } finally { + lock.unlock(); + } + + if (responseCbk == null) { + final String msg = getLogPrefix() + " invalid callID: " + callId; + LOG.error(msg); + // If we get a bad RPC ID back, we are probably somehow misaligned from + // the server. So, we disconnect the connection. + throw new NonRecoverableException(Status.IllegalState(msg)); + } + + if (!header.hasIsError() || !header.getIsError()) { + // The success case. + responseCbk.call(new CallResponseInfo(response, null)); + return; + } + + final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder(); + KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder); + final RpcHeader.ErrorStatusPB error = errorBuilder.build(); + if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) || + error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) { + responseCbk.call(new CallResponseInfo( + response, new RecoverableException(Status.ServiceUnavailable(error.getMessage())))); + return; + } + + final String message = getLogPrefix() + " server sent error " + error.getMessage(); + LOG.error(message); // can be useful + responseCbk.call(new CallResponseInfo( + response, new RpcRemoteException(Status.RemoteError(message), error))); + } + + /** {@inheritDoc} */ + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, + final ExceptionEvent event) { + final Throwable e = event.getCause(); + final Channel c = event.getChannel(); + + if (e instanceof RejectedExecutionException) { + LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e); + } else if (e instanceof ReadTimeoutException) { + LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix()); + } else if (e instanceof ClosedChannelException) { + if (!explicitlyDisconnected) { + LOG.info("{} lost connection to peer", getLogPrefix()); + } + } else if (e instanceof SSLException && explicitlyDisconnected) { + // There's a race in Netty where, when we call Channel.close(), it tries + // to send a TLS 'shutdown' message and enters a shutdown state. If another + // thread races to send actual data on the channel, then Netty will get a + // bit confused that we are trying to send data and misinterpret it as a + // renegotiation attempt, and throw an SSLException. So, we just ignore any + // SSLException if we've already attempted to close. + LOG.debug("{} ignoring SSLException: already disconnected", getLogPrefix()); + } else { + LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e); + } + if (c.isOpen()) { + Channels.close(c); // Will trigger channelClosed(), which will cleanup() + } else { // else: presumably a connection timeout. + cleanup(e.getMessage()); // => need to cleanup() from here directly. + } + } + + /** Getter for the peer's end-point information */ + public ServerInfo getServerInfo() { + return serverInfo; + } + + /** + * @return true iff the connection is in the DISCONNECTED state + */ + boolean isDisconnected() { + lock.lock(); + try { + return state == State.DISCONNECTED; + } finally { + lock.unlock(); + } + } + + /** + * TODO(aserbin) make it possible to avoid calling this when the server features are not known yet + * + * @return the set of server's features, if known; null otherwise + */ + @Nullable + Set getPeerFeatures() { + Set features = null; + lock.lock(); + try { + if (negotiationResult != null) { + features = negotiationResult.serverFeatures; + } + } finally { + lock.unlock(); + } + return features; + } + + /** + * @return string representation of the peer (i.e. the server) information suitable for logging + */ + String getLogPrefix() { + return "[peer " + serverInfo.getUuid() + "]"; + } + + /** + * Enqueue outbound message for sending to the remote server via Kudu RPC. The enqueueMessage() + * accepts messages even if the connection hasn't yet been established: the enqueued messages + * are sent out as soon as the connection to the server is ready. The connection is initiated upon + * enqueuing the very first outbound message. + */ + void enqueueMessage(RpcOutboundMessage msg, Callback cb) + throws RecoverableException { + lock.lock(); + try { + if (state == State.DISCONNECTED) { + // The upper-level caller should handle the exception and retry using a new connection. + throw new RecoverableException(Status.IllegalState( + "connection in DISCONNECTED state; cannot enqueue a message")); + } + + if (state == State.NEW) { + // Schedule connecting to the server. + connect(); + } + + // Set the call identifier for the outgoing RPC. + final int callId = nextCallId++; + RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder(); + headerBuilder.setCallId(callId); + + // Amend the timeout for the call, if necessary. + if (socketReadTimeoutMs > 0) { + final int timeoutMs = headerBuilder.getTimeoutMillis(); + if (timeoutMs > 0) { + headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs)); + } + } + + // If the connection hasn't been negotiated yet, add the message into the queuedMessages list. + // The elements of the queuedMessages list will be processed when the negotiation either + // succeeds or fails. + if (state != State.READY) { + queuedMessages.add(new QueuedMessage(msg, cb)); + return; + } + + // It's time to initiate sending the message over the wire. + sendCallToWire(msg, cb); + } finally { + lock.unlock(); + } + } + + /** + * Triggers the channel to be disconnected, which will asynchronously cause all + * queued and in-flight RPCs to be failed. This method is idempotent. + * + * @return future object to wait on the disconnect completion, if necessary + */ + ChannelFuture disconnect() { + explicitlyDisconnected = true; + return Channels.disconnect(channel); + } + + /** + * If open, forcefully shut down the connection to the server. This is the same as + * {@link #disconnect}, but it returns Deferred instead of ChannelFuture. + * + * @return deferred object for tracking the shutting down of this connection + */ + Deferred shutdown() { + final ChannelFuture disconnectFuture = disconnect(); + final Deferred d = new Deferred<>(); + disconnectFuture.addListener(new ChannelFutureListener() { + public void operationComplete(final ChannelFuture future) { + if (future.isSuccess()) { + d.callback(null); + return; + } + final Throwable t = future.getCause(); + if (t instanceof Exception) { + d.callback(t); + } else { + d.callback(new NonRecoverableException( + Status.IllegalState("failed to shutdown: " + this), t)); + } + } + }); + return d; + } + + /** + * @return string representation of this object (suitable for printing into the logs, etc.) + */ + public String toString() { + final StringBuilder buf = new StringBuilder(); + buf.append("Connection@") + .append(hashCode()) + .append("(channel=") + .append(channel) + .append(", uuid=") + .append(serverInfo.getUuid()); + int queuedMessagesNum = 0; + int inflightMessagesNum = 0; + lock.lock(); + try { + queuedMessagesNum = queuedMessages == null ? 0 : queuedMessages.size(); + inflightMessagesNum = inflightMessages == null ? 0 : inflightMessages.size(); + } finally { + lock.unlock(); + } + buf.append(", #queued=").append(queuedMessagesNum) + .append(", #inflight=").append(inflightMessagesNum) + .append(")"); + return buf.toString(); + } + + /** + * This is test-only method. + * + * @return true iff the connection is in the READY state + */ + @VisibleForTesting + boolean isReady() { + lock.lock(); + try { + return state == State.READY; + } finally { + lock.unlock(); + } + } + + /** Start sending the message to the server over the wire. */ + @GuardedBy("lock") + private void sendCallToWire(final RpcOutboundMessage msg, Callback cb) { + Preconditions.checkState(lock.isHeldByCurrentThread()); + Preconditions.checkState(state == State.READY); + Preconditions.checkNotNull(inflightMessages); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} sending {}", getLogPrefix(), msg); + } + final int callId = msg.getHeaderBuilder().getCallId(); + final Callback empty = inflightMessages.put(callId, cb); + Preconditions.checkArgument(empty == null); + Channels.write(channel, msg); + } + + /** + * Process the fact that the connection has been disconnected: update the state of this object and + * clean up any outstanding or lingering messages, notifying on the error via their status + * callbacks. The callee is supposed to handle the error and retry sending the messages, + * if needed. + * + * @param errorMessage a string to describe the cause of the cleanup + */ + private void cleanup(final String errorMessage) { + List queued; + Map> inflight; + + lock.lock(); + try { + if (state == State.DISCONNECTED) { + Preconditions.checkState(queuedMessages == null); + Preconditions.checkState(inflightMessages == null); + return; + } + + queued = queuedMessages; + queuedMessages = null; + + inflight = inflightMessages; + inflightMessages = null; + + state = State.DISCONNECTED; + } finally { + lock.unlock(); + } + final Status error = Status.NetworkError(getLogPrefix() + " " + + (errorMessage == null ? "connection reset" : errorMessage)); + final RecoverableException exception = new RecoverableException(error); + + for (Callback cb : inflight.values()) { + try { + cb.call(new CallResponseInfo(null, exception)); + } catch (Exception e) { + LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e); + } + } + + if (queued != null) { + for (QueuedMessage qm : queued) { + try { + qm.cb.call(new CallResponseInfo(null, exception)); + } catch (Exception e) { + LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e); + } + } + } + } + + /** Initiate opening TCP connection to the server. */ + private void connect() { + Preconditions.checkState(lock.isHeldByCurrentThread()); + Preconditions.checkState(state == State.NEW); + state = State.CONNECTING; + channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort())); + } + + /** State of the Connection object. */ + private enum State { + NEW, // The object has just been created. + CONNECTING, // The establishment of TCP connection to the server has started. + NEGOTIATING, // The connection negotiation has started. + READY, // The connection to the server has been opened, negotiated, and ready to use. + DISCONNECTED, // The TCP connection has been dropped off. + } + + /** + * The class to represent RPC response received from the remote server. + * If the {@code exception} is null, then it's a success case and the {@code response} contains + * the information on the response. Otherwise it's an error and the {@code exception} provides + * information on the error. For the recoverable error case, the {@code exception} is of + * {@link RecoverableException} type, otherwise it's of {@link NonRecoverableException} type. + */ + static final class CallResponseInfo { + public final CallResponse response; + public final KuduException exception; + + CallResponseInfo(CallResponse response, KuduException exception) { + this.response = response; + this.exception = exception; + } + } + + /** Internal class representing an enqueued outgoing message. */ + private static final class QueuedMessage { + private final RpcOutboundMessage message; + private final Callback cb; + + QueuedMessage(RpcOutboundMessage message, Callback cb) { + this.message = message; + this.cb = cb; + } + } + + /** The helper class to build the Netty's connection pipeline. */ + private final class ConnectionPipeline extends DefaultChannelPipeline { + void init() { + super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder( + KuduRpc.MAX_RPC_SIZE, + 0, // length comes at offset 0 + 4, // length prefix is 4 bytes long + 0, // no "length adjustment" + 4 /* strip the length prefix */)); + super.addLast("decode-inbound", new CallResponse.Decoder()); + super.addLast("encode-outbound", new RpcOutboundMessage.Encoder()); + if (Connection.this.socketReadTimeoutMs > 0) { + super.addLast("timeout-handler", new ReadTimeoutHandler( + Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS)); + } + super.addLast("kudu-handler", Connection.this); + } + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index 83ae8f0..844e79f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -17,263 +17,126 @@ package org.apache.kudu.client; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.net.HostAndPort; import com.stumbleupon.async.Deferred; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.jboss.netty.channel.DefaultChannelPipeline; -import org.jboss.netty.channel.socket.SocketChannel; -import org.jboss.netty.channel.socket.SocketChannelConfig; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.kudu.Common; -import org.apache.kudu.master.Master; -import org.apache.kudu.util.NetUtil; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.util.HashedWheelTimer; /** - * The ConnectionCache is responsible for managing connections to masters and tablet servers. - * There should only be one instance per Kudu client, and can not be shared between - * clients. - *

    - * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by - * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues. - * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes - * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's - * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead - * connection prevents tight looping when hitting "Connection refused"-type of errors. + * The ConnectionCache is responsible for managing connections to Kudu masters and tablet servers. + * There should only be one instance of ConnectionCache per Kudu client, and it should not be + * shared between clients. *

    + * Disconnected instances of the {@link Connection} class are replaced in the cache with instances + * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map + * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers + * to encounter memory issues. + * * This class is thread-safe. */ @InterfaceAudience.Private @InterfaceStability.Unstable class ConnectionCache { + /** Security context to use for connection negotiation. */ + private final SecurityContext securityContext; - private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class); + /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */ + private final long socketReadTimeoutMs; - /** - * Cache that maps UUIDs to the clients connected to them. - *

    - * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put, - * {@code putIfAbsent} isn't a good fit for us since it requires creating - * an object that may be "wasted" in case another thread wins the insertion - * race, and we don't want to create unnecessary connections. - */ - @GuardedBy("lock") - private final HashMap uuid2client = new HashMap<>(); + /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */ + private final HashedWheelTimer timer; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** Netty's channel factory to use by connections. */ + private final ClientSocketChannelFactory channelFactory; - private final Lock readLock = lock.readLock(); - private final Lock writeLock = lock.readLock(); + /** Synchronization primitive to guard access to the fields below. */ + private final ReentrantLock lock = new ReentrantLock(); - private final AsyncKuduClient kuduClient; + @GuardedBy("lock") + private final HashMap uuid2connection = new HashMap<>(); /** - * Create a new empty ConnectionCache that will used the passed client to create connections. - * @param client a client that contains the information we need to create connections + * Create a new empty ConnectionCache given the specified parameters. */ - ConnectionCache(AsyncKuduClient client) { - this.kuduClient = client; + ConnectionCache(SecurityContext securityContext, + long socketReadTimeoutMs, + HashedWheelTimer timer, + ClientSocketChannelFactory channelFactory) { + this.securityContext = securityContext; + this.socketReadTimeoutMs = socketReadTimeoutMs; + this.timer = timer; + this.channelFactory = channelFactory; } /** - * Create a connection to a tablet server based on information provided by the master. - * @param tsInfoPB master-provided information for the tablet server - * @return an object that contains all the server's information - * @throws UnknownHostException if we cannot resolve the tablet server's IP address + * Get connection to the specified server. If no connection exists or the existing connection + * is already disconnected, then create a new connection to the specified server. The newly + * created connection is not negotiated until enqueuing the first RPC to the target server. + * + * @param serverInfo the server end-point to connect to + * @return instance of this object with the specified destination */ - ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException { - List addresses = tsInfoPB.getRpcAddressesList(); - String uuid = tsInfoPB.getPermanentUuid().toStringUtf8(); - if (addresses.isEmpty()) { - LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid); - return null; - } - - // from meta_cache.cc - // TODO: if the TS advertises multiple host/ports, pick the right one - // based on some kind of policy. For now just use the first always. - HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0)); - InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); - if (inetAddress == null) { - throw new UnknownHostException( - "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'"); - } - return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo(); - } - - TabletClient newMasterClient(HostAndPort hostPort) { - // We should pass a UUID here but we have a chicken and egg problem, we first need to - // communicate with the masters to find out about them, and that's what we're trying to do. - // The UUID is just used for logging and cache key, so instead we just use a constructed - // string with the master host and port as. - return newClient("master-" + hostPort.toString(), hostPort); - } + public Connection getConnection(final ServerInfo serverInfo) { + Connection connection; - TabletClient newClient(String uuid, HostAndPort hostPort) { - InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost()); - if (inetAddress == null) { - // TODO(todd): should we log the resolution failure? throw an exception? - return null; - } - - ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress); - return newClient(serverInfo); - } - - TabletClient newClient(ServerInfo serverInfo) { - TabletClient client; - SocketChannel chan; - - writeLock.lock(); + lock.lock(); try { - client = uuid2client.get(serverInfo.getUuid()); - if (client != null && client.isAlive()) { - return client; + // First try to find an existing connection. + connection = uuid2connection.get(serverInfo.getUuid()); + if (connection == null || connection.isDisconnected()) { + // If no valid connection is found, create a new one. + connection = new Connection(serverInfo, securityContext, + socketReadTimeoutMs, timer, channelFactory); + uuid2connection.put(serverInfo.getUuid(), connection); } - final TabletClientPipeline pipeline = new TabletClientPipeline(); - client = pipeline.init(serverInfo); - chan = this.kuduClient.getChannelFactory().newChannel(pipeline); - uuid2client.put(serverInfo.getUuid(), client); - } finally { - writeLock.unlock(); - } - - final SocketChannelConfig config = chan.getConfig(); - config.setConnectTimeoutMillis(5000); - config.setTcpNoDelay(true); - // Unfortunately there is no way to override the keep-alive timeout in - // Java since the JRE doesn't expose any way to call setsockopt() with - // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. - config.setKeepAlive(true); - chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), - serverInfo.getPort())); // Won't block. - return client; - } - - /** - * Get a connection to a server for the given UUID. The returned connection can be down and its - * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's - * gonna be re-connected automatically, use {@link #getLiveClient(String)}. - * @param uuid server's identifier - * @return a connection to a server, or null if the passed UUID isn't known - */ - TabletClient getClient(String uuid) { - readLock.lock(); - try { - return uuid2client.get(uuid); } finally { - readLock.unlock(); + lock.unlock(); } - } - /** - * Get a connection to a server for the given UUID. This method will automatically call - * {@link #newClient(String, InetAddress, int)} if the cached connection is down. - * @param uuid server's identifier - * @return a connection to a server, or null if the passed UUID isn't known - */ - TabletClient getLiveClient(String uuid) { - TabletClient client = getClient(uuid); - - if (client == null) { - return null; - } else if (client.isAlive()) { - return client; - } else { - return newClient(client.getServerInfo()); - } + return connection; } /** - * Asynchronously closes every socket, which will also cancel all the RPCs in flight. + * Asynchronously terminate every connection. This also cancels all the pending and in-flight + * RPCs. */ Deferred> disconnectEverything() { - readLock.lock(); + lock.lock(); try { - ArrayList> deferreds = new ArrayList<>(uuid2client.size()); - for (TabletClient ts : uuid2client.values()) { - deferreds.add(ts.shutdown()); + ArrayList> deferreds = new ArrayList<>(uuid2connection.size()); + for (Connection c : uuid2connection.values()) { + deferreds.add(c.shutdown()); } return Deferred.group(deferreds); } finally { - readLock.unlock(); + lock.unlock(); } } /** - * The list returned by this method can't be modified, - * but calling certain methods on the returned TabletClients can have an effect. For example, - * it's possible to forcefully shutdown a connection to a tablet server by calling {@link - * TabletClient#shutdown()}. - * @return copy of the current TabletClients list - */ - List getImmutableTabletClientsList() { - readLock.lock(); - try { - return ImmutableList.copyOf(uuid2client.values()); - } finally { - readLock.unlock(); - } - } - - /** - * Queries all the cached connections if they are alive. - * @return true if all the connections are down, else false + * Return a copy of the all-connections-list. This method is exposed only to allow + * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements + * of the cache. + * + * @return a copy of the list of all connections in the connection cache */ @VisibleForTesting - boolean allConnectionsAreDead() { - readLock.lock(); + List getConnectionListCopy() { + lock.lock(); try { - for (TabletClient tserver : uuid2client.values()) { - if (tserver.isAlive()) { - return false; - } - } + return ImmutableList.copyOf(uuid2connection.values()); } finally { - readLock.unlock(); - } - return true; - } - - private final class TabletClientPipeline extends DefaultChannelPipeline { - TabletClient init(ServerInfo serverInfo) { - super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder( - KuduRpc.MAX_RPC_SIZE, - 0, // length comes at offset 0 - 4, // length prefix is 4 bytes long - 0, // no "length adjustment" - 4 /* strip the length prefix */)); - super.addLast("decode-inbound", new CallResponse.Decoder()); - super.addLast("encode-outbound", new RpcOutboundMessage.Encoder()); - AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; - final TabletClient client = new TabletClient(kuduClient, serverInfo); - if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { - super.addLast("timeout-handler", - new ReadTimeoutHandler(kuduClient.getTimer(), - kuduClient.getDefaultSocketReadTimeoutMs(), - TimeUnit.MILLISECONDS)); - } - super.addLast("kudu-handler", client); - - return client; + lock.unlock(); } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java index 30f1a9c..0de894f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java @@ -131,13 +131,13 @@ public abstract class KuduRpc { * that access this attribute will have a happens-before relationship with * the rest of the code, due to other existing synchronization. */ - byte attempt; // package-private for TabletClient and AsyncKuduClient only. + int attempt; // package-private for RpcProxy and AsyncKuduClient only. /** - * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of + * Set by RpcProxy when isRequestTracked returns true to identify this RPC in the sequence of * RPCs sent by this client. Once it is set it should never change unless the RPC is reused. */ - long sequenceId = RequestTracker.NO_SEQ_NO; + private long sequenceId = RequestTracker.NO_SEQ_NO; KuduRpc(KuduTable table) { this.table = table; http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java index 46d99db..a917af2 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java @@ -147,9 +147,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler { */ private DecoderEmbedder sslEmbedder; - /** True if we have negotiated TLS with the server */ - private boolean negotiatedTls; - /** * The nonce sent from the server to the client, or null if negotiation has * not yet taken place, or the server does not send a nonce. @@ -290,7 +287,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler { // Store the supported features advertised by the server. serverFeatures = getFeatureFlags(response); // If the server supports TLS, we will always speak TLS to it. - negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS); + final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS); // Check the negotiated authentication type sent by the server. chosenAuthnType = chooseAuthenticationType(response); http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index d4702f8..a6025f7 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import com.google.common.base.Objects; @@ -38,7 +39,7 @@ import org.apache.kudu.master.Master; * This class encapsulates the information regarding a tablet and its locations. *

    * RemoteTablet's main function is to keep track of where the leader for this - * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find + * tablet is. For example, an RPC might call {@link #getLeaderServerInfo()}, contact that TS, find * it's not the leader anymore, and then call {@link #demoteLeader(String)}. *

    * A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often, @@ -138,53 +139,61 @@ class RemoteTablet implements Comparable { } /** - * Gets the UUID of the tablet server that we think holds the leader replica for this tablet. - * @return a UUID of a tablet server that we think has the leader, else null + * Get the information on the tablet server that we think holds the leader replica for this + * tablet. + * + * @return information on a tablet server that we think has the leader, else null */ - String getLeaderUUID() { + @Nullable + ServerInfo getLeaderServerInfo() { synchronized (tabletServers) { - return leaderUuid; + return tabletServers.get(leaderUuid); } } /** - * Gets the UUID of the closest server. If none is closer than the others, returns a random - * server UUID. - * @return the UUID of the closest server, which might be any if none is closer, or null if this - * cache doesn't know of any servers + * Get the information on the closest server. If none is closer than the others, + * return the information on a randomly picked server. + * + * @return the information on the closest server, which might be any if none is closer, or null + * if this cache doesn't know any servers. */ - String getClosestUUID() { + @Nullable + ServerInfo getClosestServerInfo() { synchronized (tabletServers) { - String lastUuid = null; - for (ServerInfo serverInfo : tabletServers.values()) { - lastUuid = serverInfo.getUuid(); - if (serverInfo.isLocal()) { - return serverInfo.getUuid(); + ServerInfo last = null; + for (ServerInfo e : tabletServers.values()) { + last = e; + if (e.isLocal()) { + return e; } } - return lastUuid; + return last; } } /** * Helper function to centralize the calling of methods based on the passed replica selection * mechanism. + * * @param replicaSelection replica selection mechanism to use - * @return a UUID for the server that matches the selection, can be null + * @return information on the server that matches the selection, can be null */ - String getReplicaSelectedUUID(ReplicaSelection replicaSelection) { + @Nullable + ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) { switch (replicaSelection) { case LEADER_ONLY: - return getLeaderUUID(); + return getLeaderServerInfo(); case CLOSEST_REPLICA: - return getClosestUUID(); + return getClosestServerInfo(); default: - throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection); + throw new RuntimeException("unknown replica selection mechanism " + replicaSelection); } } /** - * Gets the replicas of this tablet. The returned list may not be mutated. + * Get replicas of this tablet. The returned list may not be mutated. + * * @return the replicas of the tablet */ List getReplicas() {