accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [12/51] [abbrv] git commit: ACCUMULO-378 Try to get the tracing working for tservers sending data
Date Sat, 14 Jun 2014 04:55:12 GMT
ACCUMULO-378 Try to get the tracing working for tservers sending data


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/092e22ea
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/092e22ea
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/092e22ea

Branch: refs/heads/master
Commit: 092e22ea5509ab8961a80de682687da503bec30a
Parents: 3a619ff
Author: Josh Elser <elserj@apache.org>
Authored: Tue May 27 20:11:40 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue May 27 20:11:40 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 155 ++++++++++---------
 1 file changed, 79 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/092e22ea/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 4051daf..f3a657c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -152,82 +152,84 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
     final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 
-    Trace.on("AccumuloReplicaSystem");
-
-    Instance peerInstance = getPeerInstance(target);
-    // Remote identifier is an integer (table id) in this case.
-    final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
-
-    // Attempt the replication of this status a number of times before giving up and
-    // trying to replicate it again later some other time.
-    int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
-    for (int i = 0; i < numAttempts; i++) {
-      String peerTserver;
-      Span span = Trace.start("Fetch peer tserver");
-      try {
-        // Ask the master on the remote what TServer we should talk with to replicate the
data
-        peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>()
{
-
-          @Override
-          public String execute(ReplicationCoordinator.Client client) throws Exception {
-            return client.getServicerAddress(remoteTableId, tCredsForPeer);
-          }
-
-        });
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        // No progress is made
-        log.error("Could not connect to master at {}, cannot proceed with replication. Will
retry", target, e);
-        continue;
-      } finally {
-        span.stop();
-      }
+    try {
+      Trace.on("AccumuloReplicaSystem");
+
+      Instance peerInstance = getPeerInstance(target);
+      // Remote identifier is an integer (table id) in this case.
+      final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+
+      // Attempt the replication of this status a number of times before giving up and
+      // trying to replicate it again later some other time.
+      int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
+      for (int i = 0; i < numAttempts; i++) {
+        String peerTserver;
+        Span span = Trace.start("Fetch peer tserver");
+        try {
+          // Ask the master on the remote what TServer we should talk with to replicate the
data
+          peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new
ClientExecReturn<String,ReplicationCoordinator.Client>() {
+
+            @Override
+            public String execute(ReplicationCoordinator.Client client) throws Exception
{
+              return client.getServicerAddress(remoteTableId, tCredsForPeer);
+            }
+
+          });
+        } catch (AccumuloException | AccumuloSecurityException e) {
+          // No progress is made
+          log.error("Could not connect to master at {}, cannot proceed with replication.
Will retry", target, e);
+          continue;
+        } finally {
+          span.stop();
+        }
 
-      if (null == peerTserver) {
-        // Something went wrong, and we didn't get a valid tserver from the remote for some
reason
-        log.warn("Did not receive tserver from master at {}, cannot proceed with replication.
Will retry.", target);
-        continue;
-      }
+        if (null == peerTserver) {
+          // Something went wrong, and we didn't get a valid tserver from the remote for
some reason
+          log.warn("Did not receive tserver from master at {}, cannot proceed with replication.
Will retry.", target);
+          continue;
+        }
 
-      // We have a tserver on the remote -- send the data its way.
-      Status finalStatus;
-      final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
-      try {
-        if (p.getName().endsWith(RFILE_SUFFIX)) {
-          span = Trace.start("RFile replication");
-          try {
-            finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit,
remoteTableId, tCredsForPeer, helper);
-          } finally {
-            span.stop();
-          }
-        } else {
-          span = Trace.start("WAL replication");
-          try {
-            finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit,
remoteTableId, tCredsForPeer, helper);
-          } finally {
-            span.stop();
+        // We have a tserver on the remote -- send the data its way.
+        Status finalStatus;
+        final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
+        try {
+          if (p.getName().endsWith(RFILE_SUFFIX)) {
+            span = Trace.start("RFile replication");
+            try {
+              finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status,
sizeLimit, remoteTableId, tCredsForPeer, helper);
+            } finally {
+              span.stop();
+            }
+          } else {
+            span = Trace.start("WAL replication");
+            try {
+              finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit,
remoteTableId, tCredsForPeer, helper);
+            } finally {
+              span.stop();
+            }
           }
-        }
 
-        log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
+          log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 
-        return finalStatus;
-      } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
-        log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
-        UtilWaitThread.sleep(1000);
+          return finalStatus;
+        } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
+          log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
+          UtilWaitThread.sleep(1000);
+        }
       }
-    }
-
-    log.info("No progress was made after {} attempts to replicate {}, returning so file can
be re-queued", numAttempts, p);
 
-    Trace.offNoFlush();
+      log.info("No progress was made after {} attempts to replicate {}, returning so file
can be re-queued", numAttempts, p);
 
-    // We made no status, punt on it for now, and let it re-queue itself for work
-    return status;
+      // We made no status, punt on it for now, and let it re-queue itself for work
+      return status;
+    } finally {
+      Trace.offNoFlush();
+    }
   }
 
-  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver,
final ReplicationTarget target, final Path p,
-      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials
tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
-      AccumuloSecurityException {
+  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver,
final ReplicationTarget target, final Path p, final Status status,
+      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper
helper) throws TTransportException,
+      AccumuloException, AccumuloSecurityException {
     DataInputStream input;
     try {
       input = getRFileInputStream(p);
@@ -239,8 +241,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Status lastStatus = status, currentStatus = status;
     while (true) {
       // Read and send a batch of mutations
-      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance,
peerTserver,
-          new RFileClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId,
tcreds));
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance,
peerTserver, new RFileClientExecReturn(target, input, p,
+          currentStatus, sizeLimit, remoteTableId, tcreds));
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -271,9 +273,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     }
   }
 
-  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final
ReplicationTarget target, final Path p,
-      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials
tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
-      AccumuloSecurityException {
+  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final
ReplicationTarget target, final Path p, final Status status,
+      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper
helper) throws TTransportException,
+      AccumuloException, AccumuloSecurityException {
 
     final Set<Integer> tids;
     final DataInputStream input;
@@ -315,8 +317,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       ReplicationStats replResult;
       try {
         // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-            new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId,
tcreds, tids));
+        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+            remoteTableId, tcreds, tids));
       } finally {
         span.stop();
       }
@@ -371,7 +373,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     private TCredentials tcreds;
     private Set<Integer> tids;
 
-    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status
status, long sizeLimit, int remoteTableId, TCredentials tcreds, Set<Integer> tids) {
+    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status
status, long sizeLimit, int remoteTableId, TCredentials tcreds,
+        Set<Integer> tids) {
       this.target = target;
       this.input = input;
       this.p = p;
@@ -386,8 +389,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     public ReplicationStats execute(Client client) throws Exception {
       WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
 
-      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication
to peer '{}'",
-          (Long.MAX_VALUE == edits.entriesConsumed) ? "all" : edits.entriesConsumed, edits.sizeInBytes,
p);
+      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication
to peer '{}'", (Long.MAX_VALUE == edits.entriesConsumed) ? "all"
+          : edits.entriesConsumed, edits.sizeInBytes, p);
 
       // If we have some edits to send
       if (0 < edits.walEdits.getEditsSize()) {


Mime
View raw message