accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/6] accumulo git commit: [ACCUMULO-4506] Add a timeout to a replication RPC call
Date Thu, 04 May 2017 01:24:55 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.7 771475ad5 -> 5f6882fc2
  refs/heads/1.8 cae07900b -> c093b561d
  refs/heads/master ac460d0c8 -> 6128f024d


[ACCUMULO-4506] Add a timeout to a replication RPC call

This addresses an issue where a replication task will get stuck for a
substantial amount of time.  Using a timeout for the client will abandon
the task after 2m (default) for a single RPC attempt.  Note that this is
related to the replication.work.attempts property, as a client timeout
will be retried by the same task up to this number of times before being
abandoned entirely.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f6882fc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f6882fc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f6882fc

Branch: refs/heads/1.7
Commit: 5f6882fc28bfea9cd48afef174dcf957eefde210
Parents: 771475a
Author: Adam J. Shook <adamjshook@gmail.com>
Authored: Wed May 3 13:19:07 2017 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed May 3 16:05:21 2017 -0400

----------------------------------------------------------------------
 .../core/client/impl/ReplicationClient.java      | 10 ++++++----
 .../org/apache/accumulo/core/conf/Property.java  |  2 ++
 .../org/apache/accumulo/core/rpc/ThriftUtil.java |  2 +-
 .../replication/AccumuloReplicaSystem.java       | 19 ++++++++++++-------
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index cc9b5c1..8265503 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -119,14 +119,16 @@ public class ReplicationClient {
    *          The client session for the peer replicant
    * @param server
    *          Server to connect to
+   * @param timeout
+   *          RPC timeout in milliseconds
    * @return A ReplicationServicer client to the given host in the given instance
    */
-  public static ReplicationServicer.Client getServicerConnection(ClientContext context, HostAndPort
server) throws TTransportException {
+  public static ReplicationServicer.Client getServicerConnection(ClientContext context, HostAndPort
server, long timeout) throws TTransportException {
     requireNonNull(context);
     requireNonNull(server);
 
     try {
-      return ThriftUtil.getClientNoTimeout(new ReplicationServicer.Client.Factory(), server,
context);
+      return ThriftUtil.getClient(new ReplicationServicer.Client.Factory(), server, context,
timeout);
     } catch (TTransportException tte) {
       log.debug("Failed to connect to servicer ({}), will retry...", server, tte);
       throw tte;
@@ -180,12 +182,12 @@ public class ReplicationClient {
     throw new AccumuloException("Could not connect to ReplicationCoordinator at " + context.getInstance().getInstanceName());
   }
 
-  public static <T> T executeServicerWithReturn(ClientContext context, HostAndPort
tserver, ClientExecReturn<T,ReplicationServicer.Client> exec)
+  public static <T> T executeServicerWithReturn(ClientContext context, HostAndPort
tserver, ClientExecReturn<T,ReplicationServicer.Client> exec, long timeout)
       throws AccumuloException, AccumuloSecurityException, TTransportException {
     ReplicationServicer.Client client = null;
     while (true) {
       try {
-        client = getServicerConnection(context, tserver);
+        client = getServicerConnection(context, tserver, timeout);
         return exec.execute(client);
       } catch (ThriftSecurityException e) {
         throw new AccumuloSecurityException(e.user, e.code, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index db9d6a6..0e7026f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -584,6 +584,8 @@ public enum Property {
   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION,
       "Amount of time to wait before re-checking for replication work, not useful outside
of tests"),
   REPLICATION_TRACE_PERCENT("replication.trace.percent", "0.1", PropertyType.FRACTION, "The
sampling percentage to use for replication traces"),
+  REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION,
+      "Amount of time for a single replication RPC call to last before failing the attempt.
See replication.work.attempts."),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 49e4349..deee3fe 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -134,7 +134,7 @@ public class ThriftUtil {
    * @param timeout
    *          Socket timeout which overrides the ClientContext timeout
    */
-  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context, long timeout)
+  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context, long timeout)
       throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout,
context);
     return createClient(factory, transport);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f6882fc/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 411ae87..9709e50 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -163,6 +163,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     final Instance localInstance = HdfsZooInstance.getInstance();
     final AccumuloConfiguration localConf = new ServerConfigurationFactory(localInstance).getConfiguration();
 
+    log.debug("Replication RPC timeout is {}", localConf.get(Property.REPLICATION_RPC_TIMEOUT.getKey()));
+
     final String principal = getPrincipal(localConf, target);
     final File keytab;
     final String password;
@@ -275,6 +277,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
         final HostAndPort peerTserver = HostAndPort.fromString(peerTserverStr);
 
+        final long timeout = localConf.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT);
+
         // We have a tserver on the remote -- send the data its way.
         Status finalStatus;
         final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
@@ -282,14 +286,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
           if (p.getName().endsWith(RFILE_SUFFIX)) {
             span = Trace.start("RFile replication");
             try {
-              finalStatus = replicateRFiles(peerContext, peerTserver, target, p, status,
sizeLimit, remoteTableId, peerContext.rpcCreds(), helper);
+              finalStatus = replicateRFiles(peerContext, peerTserver, target, p, status,
sizeLimit, remoteTableId, peerContext.rpcCreds(), helper, timeout);
             } finally {
               span.stop();
             }
           } else {
             span = Trace.start("WAL replication");
             try {
-              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit,
remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi);
+              finalStatus = replicateLogs(peerContext, peerTserver, target, p, status, sizeLimit,
remoteTableId, peerContext.rpcCreds(), helper, accumuloUgi,
+                  timeout);
             } finally {
               span.stop();
             }
@@ -314,7 +319,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   }
 
   protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver,
final ReplicationTarget target, final Path p, final Status status,
-      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final
ReplicaSystemHelper helper) throws TTransportException,
+      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final
ReplicaSystemHelper helper, long timeout) throws TTransportException,
       AccumuloException, AccumuloSecurityException {
     DataInputStream input;
     try {
@@ -328,7 +333,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     while (true) {
       // Read and send a batch of mutations
       ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext,
peerTserver, new RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds));
+          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -360,8 +365,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   }
 
   protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver,
final ReplicationTarget target, final Path p, final Status status,
-      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final
ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi)
-      throws TTransportException, AccumuloException, AccumuloSecurityException {
+      final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final
ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi,
+      long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException
{
 
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
@@ -428,7 +433,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       try {
         // Read and send a batch of mutations
         replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver,
new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids));
+            remoteTableId, tcreds, tids), timeout);
       } catch (Exception e) {
         log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(),
peerTserver, e);
         throw e;


Mime
View raw message