Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A5A2F1117E for ; Sat, 14 Jun 2014 04:55:04 +0000 (UTC) Received: (qmail 49166 invoked by uid 500); 14 Jun 2014 04:55:04 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 49137 invoked by uid 500); 14 Jun 2014 04:55:04 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 49130 invoked by uid 99); 14 Jun 2014 04:55:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Jun 2014 04:55:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 522079348F2; Sat, 14 Jun 2014 04:55:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sat, 14 Jun 2014 04:55:11 -0000 Message-Id: <4680cfa1e97d428f92a458d277a3b6e4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/51] [abbrv] git commit: ACCUMULO-378 Add tracing into the AccumuloReplicaSystem. ACCUMULO-378 Add tracing into the AccumuloReplicaSystem. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a619ffe Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a619ffe Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a619ffe Branch: refs/heads/master Commit: 3a619ffe08d8d90432218a3138faa572017b1f06 Parents: 9dec25e Author: Josh Elser Authored: Tue May 27 18:20:31 2014 -0400 Committer: Josh Elser Committed: Tue May 27 18:20:31 2014 -0400 ---------------------------------------------------------------------- .../replication/AccumuloReplicaSystem.java | 53 ++++++++++++++++++-- 1 file changed, 48 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a619ffe/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index ce44eef..4051daf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -58,6 +58,8 @@ import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams; import org.apache.accumulo.tserver.logger.LogFileKey; @@ -150,6 +152,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { Credentials credentialsForPeer = getCredentialsForPeer(localConf, target); final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance); + Trace.on("AccumuloReplicaSystem"); + Instance peerInstance = getPeerInstance(target); // Remote identifier is an integer (table id) in this case. final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier()); @@ -159,6 +163,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); for (int i = 0; i < numAttempts; i++) { String peerTserver; + Span span = Trace.start("Fetch peer tserver"); try { // Ask the master on the remote what TServer we should talk with to replicate the data peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn() { @@ -173,6 +178,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // No progress is made log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e); continue; + } finally { + span.stop(); } if (null == peerTserver) { @@ -186,9 +193,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem { final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE); try { if (p.getName().endsWith(RFILE_SUFFIX)) { - finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper); + span = Trace.start("RFile replication"); + try { + finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper); + } finally { + span.stop(); + } } else { - finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper); + span = Trace.start("WAL replication"); + try { + finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper); + } finally { + span.stop(); + } } log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus)); @@ -202,6 +219,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p); + Trace.offNoFlush(); + // We made no status, punt on it for now, and let it re-queue itself for work return status; } @@ -258,14 +277,20 @@ public class AccumuloReplicaSystem implements ReplicaSystem { final Set tids; final DataInputStream input; + Span span = Trace.start("Read WAL header"); + span.data("file", p.toString()); try { input = getWalStream(p); } catch (IOException e) { log.error("Could not create stream for WAL", e); // No data sent (bytes nor records) and no progress made return status; + } finally { + span.stop(); } + span = Trace.start("Consume WAL prefix"); + span.data("file", p.toString()); try { // We want to read all records in the WAL up to the "begin" offset contained in the Status message, // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations @@ -273,13 +298,28 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } catch (IOException e) { log.warn("Unexpected error consuming file."); return status; + } finally { + span.stop(); } Status lastStatus = status, currentStatus = status; while (true) { - // Read and send a batch of mutations - ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, - new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids)); + // Set some trace info + span = Trace.start("Replicate WAL batch"); + span.data("Size limit", Long.toString(sizeLimit)); + span.data("File", p.toString()); + span.data("Peer instance name", peerInstance.getInstanceName()); + span.data("Peer tserver", peerTserver); + span.data("Remote table ID", Integer.toString(remoteTableId)); + + ReplicationStats replResult; + try { + // Read and send a batch of mutations + replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, + new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids)); + } finally { + span.stop(); + } // Catch the overflow long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; @@ -293,11 +333,14 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // If we got a different status if (!currentStatus.equals(lastStatus)) { + span = Trace.start("Update replication table"); try { helper.recordNewStatus(p, currentStatus, target); } catch (TableNotFoundException e) { log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e); throw new RuntimeException("Replication table did not exist, will retry", e); + } finally { + span.stop(); } // If we don't have any more work, just quit