accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [03/51] [abbrv] git commit: ACCUMULO-2846 Push down the DataInputStream and immediate re-queue'ing of a file with more work to AccumuloReplicaSystem.
Date Sat, 14 Jun 2014 04:55:03 GMT
ACCUMULO-2846 Push down the DataInputStream and immediate re-queue'ing of a file with more work to AccumuloReplicaSystem.

Handling the quick re-submission of a file with more data to replicate in the ReplicationProcessor
was a nice approach as it separated the business logic from the implementation; however, this caused
a noticable performance decrease in re-reading prefix of already replicated data from the source file.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f275353e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f275353e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f275353e

Branch: refs/heads/master
Commit: f275353ebd9052a219f9e14097c652d11810e8c9
Parents: 9d9b5ed
Author: Josh Elser <elserj@apache.org>
Authored: Tue May 27 14:05:39 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue May 27 14:05:39 2014 -0400

----------------------------------------------------------------------
 .../core/client/replication/ReplicaSystem.java  |   3 +-
 .../client/replication/ReplicationTable.java    |  37 +++
 .../replication/PrintReplicationRecords.java    |   2 +-
 .../core/replication/ReplicaSystemHelper.java   |  71 ++++
 .../server/replication/ReplicationTable.java    |  35 +-
 .../master/replication/FinishedWorkUpdater.java |   6 +-
 .../replication/SequentialWorkAssigner.java     |   2 +
 .../replication/AccumuloReplicaSystem.java      | 326 ++++++++++++++-----
 .../replication/ReplicationProcessor.java       |  98 ++----
 .../replication/AccumuloReplicaSystemTest.java  |  85 ++++-
 .../replication/ReplicationProcessorTest.java   |  43 +--
 .../test/replication/MockReplicaSystem.java     |  23 +-
 .../test/replication/ReplicationTest.java       |  62 +---
 13 files changed, 519 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
index 220d7bb..e20d35f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.replication;
 
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
@@ -32,7 +33,7 @@ public interface ReplicaSystem {
    * @param target The peer
    * @return A new Status for the progress that was made
    */
-  public Status replicate(Path p, Status status, ReplicationTarget target);
+  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper);
 
   /**
    * Configure the implementation with necessary information from the system configuration

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
index 6bd34f9..0b2b9a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
@@ -16,9 +16,46 @@
  */
 package org.apache.accumulo.core.client.replication;
 
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.security.Authorizations;
+
 /**
  * 
  */
 public class ReplicationTable {
   public static final String NAME = "replication";
+
+  public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
+    return conn.createScanner(NAME, auths);
+  }
+
+  public static Scanner getScanner(Connector conn) throws TableNotFoundException {
+    return getScanner(conn, new Authorizations());
+  }
+
+  public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
+    return getBatchWriter(conn, new BatchWriterConfig());
+  }
+
+  public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
+    return conn.createBatchWriter(NAME, config);
+  }
+
+  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
+    return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
+  }
+
+  public static boolean exists(Connector conn) {
+    return exists(conn.tableOperations());
+  }
+
+  public static boolean exists(TableOperations tops) {
+    return tops.exists(NAME);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index bb98440..5104d39 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -77,7 +77,7 @@ public class PrintReplicationRecords implements Runnable {
     }
 
     out.println();
-    out.println(sdf.format(new Date()) + "Replication entries from replication table");
+    out.println(sdf.format(new Date()) + " Replication entries from replication table");
     out.println("--------------------------------------------------------------------");
 
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
new file mode 100644
index 0000000..660862c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.replication.ReplicationTable;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class ReplicaSystemHelper {
+  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class);
+
+  private Instance inst;
+  private Credentials creds;
+
+  public ReplicaSystemHelper(Instance inst, Credentials creds) {
+    this.inst = inst;
+    this.creds = creds;
+  }
+
+  /**
+   * Record the updated Status for this file and target
+   * 
+   * @param filePath
+   *          Path to file being replicated
+   * @param status
+   *          Updated Status after replication
+   * @param target
+   *          Peer that was replicated to
+   */
+  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    try {
+      log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString (status));
+      Mutation m = new Mutation(filePath.toString());
+      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
+      bw.addMutation(m);
+    } finally {
+      bw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
index 68651ab..11edbb1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
@@ -25,13 +25,9 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
@@ -41,7 +37,6 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.StatusFormatter;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
@@ -164,7 +159,7 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
     for (Entry<String,String> property : properties) {
       if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
         if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
-          log.info("Setting formatter for {} from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
+          log.info("Changing formatter for {} table from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
           try {
             tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
           } catch (AccumuloException | AccumuloSecurityException e) {
@@ -193,32 +188,4 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
 
     return true;
   }
-
-  public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
-    return conn.createScanner(NAME, auths);
-  }
-
-  public static Scanner getScanner(Connector conn) throws TableNotFoundException {
-    return getScanner(conn, new Authorizations());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
-    return getBatchWriter(conn, new BatchWriterConfig());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
-    return conn.createBatchWriter(NAME, config);
-  }
-
-  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
-    return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
-  }
-
-  public static boolean exists(Connector conn) {
-    return exists(conn.tableOperations());
-  }
-
-  public static boolean exists(TableOperations tops) {
-    return tops.exists(NAME);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 5e0d726..3f26af9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -94,7 +94,7 @@ public class FinishedWorkUpdater implements Runnable {
           continue;
         }
 
-        log.debug("Processing work progress for {}", serializedRow.getKey().getRow());
+        log.debug("Processing work progress for {} with {} columns", serializedRow.getKey().getRow(), wholeRow.size());
 
         Map<String,Long> tableIdToProgress = new HashMap<>();
         boolean error = false;
@@ -122,7 +122,7 @@ public class FinishedWorkUpdater implements Runnable {
 
           // Find the minimum value for begin (everyone has replicated up to this offset in the file)
           tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
-        }
+        } 
 
         for (Entry<String,Long> progressByTable : tableIdToProgress.entrySet()) {
           log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), progressByTable.getKey(), progressByTable.getValue());
@@ -162,6 +162,8 @@ public class FinishedWorkUpdater implements Runnable {
         }
       }
     } finally {
+      log.debug("Finished updating files with completed replication work");
+
       bs.close();
 
       try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index af43d7d..d168867 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -293,6 +293,8 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
           if (null == keyBeingReplicated) {
             // If there is no file, submit this one for replication
             newReplicationTasksSubmitted += queueWork(key, file, sourceTableId, queuedWorkForPeer);
+          } else if (keyBeingReplicated.startsWith(p.getName())) {
+            log.debug("Not re-queueing work for {} as it has already been queued fore replication to {}", file, target);
           } else {
             log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
           }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index c6b266f..ce44eef 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ReplicationClient;
@@ -40,7 +41,10 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
@@ -140,7 +144,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   }
 
   @Override
-  public Status replicate(final Path p, final Status status, final ReplicationTarget target) {
+  public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
     final Instance localInstance = HdfsZooInstance.getInstance();
     final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
@@ -152,7 +156,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     // Attempt the replication of this status a number of times before giving up and
     // trying to replicate it again later some other time.
-    for (int i = 0; i < localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); i++) {
+    int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
+    for (int i = 0; i < numAttempts; i++) {
       String peerTserver;
       try {
         // Ask the master on the remote what TServer we should talk with to replicate the data
@@ -177,76 +182,229 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
 
       // We have a tserver on the remote -- send the data its way.
-      ReplicationStats replResult;
-      // TODO should chunk up the given file into some configurable sizes instead of just sending the entire file all at once
-      // configuration should probably just be size based.
+      Status finalStatus;
       final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
       try {
-        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-            new ClientExecReturn<ReplicationStats,ReplicationServicer.Client>() {
-              @Override
-              public ReplicationStats execute(Client client) throws Exception {
-                // RFiles have an extension, call everything else a WAL
-                if (p.getName().endsWith(RFILE_SUFFIX)) {
-                  RFileReplication kvs = getKeyValues(target, p, status, sizeLimit);
-                  if (0 < kvs.keyValues.getKeyValuesSize()) {
-                    long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tCredsForPeer);
-                    if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
-                      log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(),
-                          entriesReplicated);
-                    }
-
-                    // Not as important to track as WALs because we don't skip any KVs in an RFile
-                    return kvs;
-                  }
-                } else {
-                  WalReplication edits = getWalEdits(target, getWalStream(p), p, status, sizeLimit);
-
-                  // If we have some edits to send
-                  if (0 < edits.walEdits.getEditsSize()) {
-                    long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tCredsForPeer);
-                    if (entriesReplicated != edits.numUpdates) {
-                      log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
-                    }
-
-                    // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
-                    // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
-                    return edits;
-                  } else if (edits.entriesConsumed > 0) {
-                    // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
-                    // log entries multiple times to determine if they should be sent
-                    return edits;
-                  }
-                }
-
-                // No data sent (bytes nor records) and no progress made
-                return new ReplicationStats(0l, 0l, 0l);
-              }
-            });
-
-        log.debug("Replicated {} entries from {} to {} which is a member of the peer '{}'", replResult.sizeInRecords, p, peerTserver,
-            peerInstance.getInstanceName());
-
-        // Catch the overflow
-        long newBegin = status.getBegin() + replResult.entriesConsumed;
-        if (newBegin < 0) {
-          newBegin = Long.MAX_VALUE;
+        if (p.getName().endsWith(RFILE_SUFFIX)) {
+          finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+        } else {
+          finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
         }
 
-        // Update the begin to account for what we replicated
-        Status updatedStatus = Status.newBuilder(status).setBegin(newBegin).build();
+        log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 
-        return updatedStatus;
+        return finalStatus;
       } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
         log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
-        UtilWaitThread.sleep(250);
+        UtilWaitThread.sleep(1000);
       }
     }
 
+    log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
+
     // We made no status, punt on it for now, and let it re-queue itself for work
     return status;
   }
 
+  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
+      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
+      AccumuloSecurityException {
+    DataInputStream input;
+    try {
+      input = getRFileInputStream(p);
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
+    }
+
+    Status lastStatus = status, currentStatus = status;
+    while (true) {
+      // Read and send a batch of mutations
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+          new RFileClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds));
+
+      // Catch the overflow
+      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+      if (newBegin < 0) {
+        newBegin = Long.MAX_VALUE;
+      }
+
+      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+
+      // If we got a different status
+      if (!currentStatus.equals(lastStatus)) {
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(currentStatus)) {
+          return currentStatus;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = currentStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
+
+        // otherwise, we didn't actually replicate (likely because there was error sending the data)
+        // we can just not record any updates, and it will be picked up again by the work assigner
+        return status;
+      }
+    }
+  }
+
+  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
+      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
+      AccumuloSecurityException {
+
+    final Set<Integer> tids;
+    final DataInputStream input;
+    try {
+      input = getWalStream(p);
+    } catch (IOException e) {
+      log.error("Could not create stream for WAL", e);
+      // No data sent (bytes nor records) and no progress made
+      return status;
+    }
+
+    try {
+      // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
+      // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
+      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+    } catch (IOException e) {
+      log.warn("Unexpected error consuming file.");
+      return status;
+    }
+
+    Status lastStatus = status, currentStatus = status;
+    while (true) {
+      // Read and send a batch of mutations
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+          new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+
+      // Catch the overflow
+      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+      if (newBegin < 0) {
+        newBegin = Long.MAX_VALUE;
+      }
+
+      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+
+      // If we got a different status
+      if (!currentStatus.equals(lastStatus)) {
+        try {
+          helper.recordNewStatus(p, currentStatus, target);
+        } catch (TableNotFoundException e) {
+          log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+          throw new RuntimeException("Replication table did not exist, will retry", e);
+        }
+
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(currentStatus)) {
+          return currentStatus;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = currentStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
+
+        // otherwise, we didn't actually replicate (likely because there was error sending the data)
+        // we can just not record any updates, and it will be picked up again by the work assigner
+        return status;
+      }
+    }
+  }
+
+  protected class WalClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
+
+    private ReplicationTarget target;
+    private DataInputStream input;
+    private Path p;
+    private Status status;
+    private long sizeLimit;
+    private int remoteTableId;
+    private TCredentials tcreds;
+    private Set<Integer> tids;
+
+    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds, Set<Integer> tids) {
+      this.target = target;
+      this.input = input;
+      this.p = p;
+      this.status = status;
+      this.sizeLimit = sizeLimit;
+      this.remoteTableId = remoteTableId;
+      this.tcreds = tcreds;
+      this.tids = tids;
+    }
+
+    @Override
+    public ReplicationStats execute(Client client) throws Exception {
+      WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
+
+      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'",
+          (Long.MAX_VALUE == edits.entriesConsumed) ? "all" : edits.entriesConsumed, edits.sizeInBytes, p);
+
+      // If we have some edits to send
+      if (0 < edits.walEdits.getEditsSize()) {
+        long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
+        if (entriesReplicated != edits.numUpdates) {
+          log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
+        }
+
+        // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
+        // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
+        return edits;
+      } else if (edits.entriesConsumed > 0) {
+        // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
+        // log entries multiple times to determine if they should be sent
+        return edits;
+      }
+
+      // No data sent (bytes nor records) and no progress made
+      return new ReplicationStats(0l, 0l, 0l);
+    }
+  }
+
+  protected class RFileClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
+
+    private ReplicationTarget target;
+    private DataInputStream input;
+    private Path p;
+    private Status status;
+    private long sizeLimit;
+    private int remoteTableId;
+    private TCredentials tcreds;
+
+    public RFileClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds) {
+      this.target = target;
+      this.input = input;
+      this.p = p;
+      this.status = status;
+      this.sizeLimit = sizeLimit;
+      this.remoteTableId = remoteTableId;
+      this.tcreds = tcreds;
+    }
+
+    @Override
+    public ReplicationStats execute(Client client) throws Exception {
+      RFileReplication kvs = getKeyValues(target, input, p, status, sizeLimit);
+      if (0 < kvs.keyValues.getKeyValuesSize()) {
+        long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tcreds);
+        if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
+          log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(), entriesReplicated);
+        }
+
+        // Not as important to track as WALs because we don't skip any KVs in an RFile
+        return kvs;
+      }
+
+      // No data sent (bytes nor records) and no progress made
+      return new ReplicationStats(0l, 0l, 0l);
+    }
+  }
+
   protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
     Preconditions.checkNotNull(conf);
     Preconditions.checkNotNull(target);
@@ -269,12 +427,13 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return new ZooKeeperInstance(instanceName, zookeepers);
   }
 
-  protected RFileReplication getKeyValues(ReplicationTarget target, Path p, Status status, long sizeLimit) {
-    // TODO Implement me
+  protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit) {
+    // TODO ACCUMULO-2580 Implement me
     throw new UnsupportedOperationException();
   }
 
-  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
+  protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
+    Set<Integer> tids = new HashSet<>();
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
@@ -284,13 +443,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     // We also need to track the tids that occurred earlier in the file as mutations
     // later on might use that tid
     for (long i = 0; i < status.getBegin(); i++) {
-      try {
-        key.readFields(wal);
-        value.readFields(wal);
-      } catch (EOFException e) {
-        log.warn("Unexpectedly reached the end of file.");
-        return new WalReplication(new WalEdits(), 0, 0, 0);
-      }
+      key.readFields(wal);
+      value.readFields(wal);
 
       switch (key.event) {
         case DEFINE_TABLET:
@@ -303,20 +457,16 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
     }
 
-    WalReplication repl = getEdits(wal, sizeLimit, target, status, p, desiredTids);
-
-    log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == repl.entriesConsumed) ? "all"
-        : repl.entriesConsumed, repl.sizeInBytes, p);
-
-    return repl;
+    return tids;
   }
 
-  protected DataInputStream getWalStream(Path p) throws IOException {
+  public DataInputStream getWalStream(Path p) throws IOException {
     DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
     return streams.getDecryptingInputStream();
   }
 
-  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p, Set<Integer> desiredTids) throws IOException {
+  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> desiredTids)
+      throws IOException {
     WalEdits edits = new WalEdits();
     edits.edits = new ArrayList<ByteBuffer>();
     long size = 0l;
@@ -413,6 +563,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return mutationsToSend;
   }
 
+  protected DataInputStream getRFileInputStream(Path p) throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
   public static class ReplicationStats {
     /**
      * The size, in bytes, of the data sent
@@ -434,6 +588,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       this.sizeInRecords = sizeInRecords;
       this.entriesConsumed = entriesConsumed;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (ReplicationStats.class.isAssignableFrom(o.getClass())) {
+        ReplicationStats other = (ReplicationStats) o;
+        return sizeInBytes == other.sizeInBytes && sizeInRecords == other.sizeInRecords && entriesConsumed == other.entriesConsumed;
+      }
+      return false;
+    }
   }
 
   public static class RFileReplication extends ReplicationStats {
@@ -467,5 +630,16 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       this.walEdits = edits;
       this.numUpdates = numMutations;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof WalReplication) {
+        WalReplication other = (WalReplication) o;
+
+        return super.equals(other) && walEdits.equals(other.walEdits) && numUpdates == other.numUpdates;
+      }
+
+      return false;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 50c79d6..3ebcda9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -22,8 +22,6 @@ import java.util.NoSuchElementException;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -31,9 +29,9 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
@@ -50,7 +48,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
 
 /**
  * Transmit the given data to a peer
@@ -62,12 +59,14 @@ public class ReplicationProcessor implements Processor {
   private final AccumuloConfiguration conf;
   private final VolumeManager fs;
   private final Credentials creds;
+  private final ReplicaSystemHelper helper;
 
   public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, VolumeManager fs, Credentials creds) {
     this.inst = inst;
     this.conf = conf;
     this.fs = fs;
     this.creds = creds;
+    this.helper = new ReplicaSystemHelper(inst, creds);
   }
 
   @Override
@@ -82,12 +81,8 @@ public class ReplicationProcessor implements Processor {
 
     log.debug("Received replication work for {} to {}", file, target);
 
-    // Find the configured replication peer so we know how to replicate to it
-    // Classname,Configuration
-    String peerType = getPeerType(target.getPeerName());
+    ReplicaSystem replica = getReplicaSystem(target);
 
-    // Get the peer that we're replicating to
-    ReplicaSystem replica = ReplicaSystemFactory.get(peerType);
     Status status;
     try {
       status = getStatus(file, target);
@@ -113,8 +108,7 @@ public class ReplicationProcessor implements Processor {
     // Sanity check that nothing bad happened and our replication source still exists
     Path filePath = new Path(file);
     try {
-      if (!fs.exists(filePath)) {
-        log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
+      if (!doesFileExist(filePath, target)) {
         return;
       }
     } catch (IOException e) {
@@ -124,7 +118,20 @@ public class ReplicationProcessor implements Processor {
 
     log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
 
-    replicate(replica, filePath, status, target);
+    replica.replicate(filePath, status, target, getHelper());
+  }
+
+  protected ReplicaSystemHelper getHelper() {
+    return helper;
+  }
+
+  protected ReplicaSystem getReplicaSystem(ReplicationTarget target) {
+    // Find the configured replication peer so we know how to replicate to it
+    // Classname,Configuration
+    String peerType = getPeerType(target.getPeerName());
+
+    // Get the peer that we're replicating to
+    return ReplicaSystemFactory.get(peerType);
   }
 
   protected String getPeerType(String peerName) {
@@ -140,6 +147,15 @@ public class ReplicationProcessor implements Processor {
     return peerType;
   }
 
+  protected boolean doesFileExist(Path filePath, ReplicationTarget target) throws IOException {
+    if (!fs.exists(filePath)) {
+      log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
+      return false;
+    }
+
+    return true;
+  }
+
   protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
     Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
@@ -148,62 +164,4 @@ public class ReplicationProcessor implements Processor {
 
     return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
   }
-
-  protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
-    Status lastStatus = status;
-    while (true) {
-      // Replicate that sucker
-      Status replicatedStatus = replica.replicate(filePath, status, target);
-  
-      log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-  
-      // If we got a different status
-      if (!replicatedStatus.equals(lastStatus)) {
-        // We actually did some work!
-        recordNewStatus(filePath, replicatedStatus, target);
-
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(replicatedStatus)) {
-          return;
-        } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = status;
-          status = replicatedStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
-            TextFormat.shortDebugString(replicatedStatus));
-  
-        // otherwise, we didn't actually replicate because there was error sending the data
-        // we can just not record any updates, and it will be picked up again by the work assigner      
-        return;
-      }
-    }
-  }
-
-  /**
-   * Record the updated Status for this file and target
-   * 
-   * @param filePath
-   *          Path to file being replicated
-   * @param status
-   *          Updated Status after replication
-   * @param target
-   *          Peer that was replicated to
-   */
-  protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
-    try {
-      Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-      BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-      log.debug("Recording new status for {}, {}", filePath.toString(), TextFormat.shortDebugString(status));
-      Mutation m = new Mutation(filePath.toString());
-      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
-      bw.addMutation(m);
-      bw.close();
-    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
-      log.error("Error recording updated Status for {}", filePath, e);
-      throw new RuntimeException(e);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 85204e3..4e2901d 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -16,16 +16,23 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -36,10 +43,15 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
+import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.ReplicationStats;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.WalClientExecReturn;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.WalReplication;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -147,7 +159,7 @@ public class AccumuloReplicaSystemTest {
 
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(9, repl.entriesConsumed);
@@ -254,7 +266,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -319,7 +331,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -341,7 +353,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(0, repl.entriesConsumed);
@@ -401,8 +413,10 @@ public class AccumuloReplicaSystemTest {
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
 
+    HashSet<Integer> tids = new HashSet<>();
+
     // Only consume the first mutation, not the second
-    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1l, tids);
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(2, repl.entriesConsumed);
@@ -411,10 +425,9 @@ public class AccumuloReplicaSystemTest {
     Assert.assertNotEquals(0, repl.sizeInBytes);
 
     status = Status.newBuilder(status).setBegin(2).build();
-    dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
 
     // Consume the rest of the mutations
-    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1l, tids);
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(1, repl.entriesConsumed);
@@ -422,4 +435,62 @@ public class AccumuloReplicaSystemTest {
     Assert.assertEquals(1, repl.sizeInRecords);
     Assert.assertNotEquals(0, repl.sizeInBytes);
   }
+
+  @Test
+  public void dontSendEmptyDataToPeer() throws Exception {
+    Client replClient = createMock(Client.class);
+    AccumuloReplicaSystem ars = createMock(AccumuloReplicaSystem.class);
+    WalEdits edits = new WalEdits(Collections.<ByteBuffer> emptyList());
+    WalReplication walReplication = new WalReplication(edits, 0, 0, 0);
+
+    ReplicationTarget target = new ReplicationTarget("peer", "2", "1");
+    DataInputStream input = null;
+    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    Status status = null;
+    long sizeLimit = Long.MAX_VALUE;
+    int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+    TCredentials tcreds = null;
+    Set<Integer> tids = new HashSet<>();
+
+    WalClientExecReturn walClientExec = ars.new WalClientExecReturn(target, input, p, status, sizeLimit, remoteTableId, tcreds, tids);
+
+    expect(ars.getWalEdits(target, input, p, status, sizeLimit, tids)).andReturn(walReplication);
+
+    replay(replClient, ars);
+
+    ReplicationStats stats = walClientExec.execute(replClient);
+
+    verify(replClient, ars);
+
+    Assert.assertEquals(new ReplicationStats(0l, 0l, 0l), stats);
+  }
+
+  @Test
+  public void consumedButNotSentDataShouldBeRecorded() throws Exception {
+    Client replClient = createMock(Client.class);
+    AccumuloReplicaSystem ars = createMock(AccumuloReplicaSystem.class);
+    WalEdits edits = new WalEdits(Collections.<ByteBuffer> emptyList());
+    WalReplication walReplication = new WalReplication(edits, 0, 5, 0);
+
+    ReplicationTarget target = new ReplicationTarget("peer", "2", "1");
+    DataInputStream input = null;
+    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    Status status = null;
+    long sizeLimit = Long.MAX_VALUE;
+    int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+    TCredentials tcreds = null;
+    Set<Integer> tids = new HashSet<>();
+
+    WalClientExecReturn walClientExec = ars.new WalClientExecReturn(target, input, p, status, sizeLimit, remoteTableId, tcreds, tids);
+
+    expect(ars.getWalEdits(target, input, p, status, sizeLimit, tids)).andReturn(walReplication);
+
+    replay(replClient, ars);
+
+    ReplicationStats stats = walClientExec.execute(replClient);
+
+    verify(replClient, ars);
+
+    Assert.assertEquals(new ReplicationStats(0l, 0l, 5l), stats);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index c88e091..17d5309 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -24,10 +25,12 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -71,46 +74,26 @@ public class ReplicationProcessorTest {
   }
 
   @Test
-  public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
     ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
-    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+    ReplicaSystemHelper helper = EasyMock.createMock(ReplicaSystemHelper.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethods("getReplicaSystem", "doesFileExist", "getStatus", "getHelper").createMock();
 
     ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     Path path = new Path("/accumulo");
 
-    Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    
-    EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
-    proc.recordNewStatus(path, firstStatus, target);
-    EasyMock.expectLastCall().once();
+    String queueKey = AbstractWorkAssigner.getQueueKey(path.toString(), target);
 
-    EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
-    proc.recordNewStatus(path, secondStatus, target);
-    EasyMock.expectLastCall().once();
+    EasyMock.expect(proc.getReplicaSystem(target)).andReturn(replica);
+    EasyMock.expect(proc.getStatus(path.toString(), target)).andReturn(status);
+    EasyMock.expect(proc.doesFileExist(path, target)).andReturn(true);
+    EasyMock.expect(proc.getHelper()).andReturn(helper);
+    EasyMock.expect(replica.replicate(path, status, target, helper)).andReturn(status);
 
     EasyMock.replay(replica, proc);
-    
-    proc.replicate(replica, path, status, target);
-
-    EasyMock.verify(replica, proc);
-  }
-
-  @Test
-  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
-    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
-    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
 
-    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
-    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    Path path = new Path("/accumulo");
-
-    EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
-
-    EasyMock.replay(replica, proc);
-    
-    proc.replicate(replica, path, status, target);
+    proc.process(queueKey, path.toString().getBytes(StandardCharsets.UTF_8));
 
     EasyMock.verify(replica, proc);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
index ac44f97..91e1a4b 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -16,7 +16,12 @@
  */
 package org.apache.accumulo.test.replication;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.commons.lang.StringUtils;
@@ -24,8 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.TextFormat;
-
 /**
  * Fake ReplicaSystem which returns that the data was fully replicated after some sleep period (in milliseconds)
  * <p>
@@ -37,7 +40,7 @@ public class MockReplicaSystem implements ReplicaSystem {
   private long sleep = 0;
 
   @Override
-  public Status replicate(Path p, Status status, ReplicationTarget target) {
+  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper) {
     Status newStatus;
     if (status.getClosed() && status.getInfiniteEnd()) {
       Status.Builder builder = Status.newBuilder(status);
@@ -48,10 +51,11 @@ public class MockReplicaSystem implements ReplicaSystem {
       }
       newStatus = builder.build();
     } else {
-      log.info("{} with status {} is not closed and with infinite length, ignoring");
+      log.info("{} with status {} is not closed and with infinite length, ignoring", p, status);
       newStatus = status;
     }
 
+    log.debug("Sleeping for {}ms before finishing replication on {}", sleep, p);
     try {
       Thread.sleep(sleep);
     } catch (InterruptedException e) {
@@ -60,7 +64,16 @@ public class MockReplicaSystem implements ReplicaSystem {
       return status;
     }
 
-    log.info("Received {}, returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+    log.info("For {}, received {}, returned {}", p, ProtobufUtil.toString(status), ProtobufUtil.toString(newStatus));
+    try {
+      helper.recordNewStatus(p, newStatus, target);
+    } catch (TableNotFoundException e) {
+      log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(newStatus), e);
+      return status;
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.error("Tried to record new status in replication table for {} as {}, but got an error", p, ProtobufUtil.toString(newStatus), e);
+      return status;
+    }
 
     return newStatus;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index f42c5ad..b59f8da 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -148,7 +148,7 @@ public class ReplicationTest extends ConfigurableMacIT {
     int attempts = 5;
     do {
       if (!exists) {
-        UtilWaitThread.sleep(200);
+        UtilWaitThread.sleep(500);
         exists = conn.tableOperations().exists(ReplicationTable.NAME);
         attempts--;
       }
@@ -1079,41 +1079,6 @@ public class ReplicationTest extends ConfigurableMacIT {
       Assert.assertFalse(t.isAlive());
     }
 
-    // write a Long.MAX_VALUE into each repl entry
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
-    Set<String> filesToWatch = new HashSet<>();
-    Text buff = new Text();
-    for (Entry<Key,Value> entry : s) {
-      StatusSection.getFile(entry.getKey(), buff);
-      filesToWatch.add(buff.toString());
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    for (Entry<Key,Value> entry : s) {
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
-          StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
     for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
@@ -1124,7 +1089,7 @@ public class ReplicationTest extends ConfigurableMacIT {
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : Arrays.asList(table1, table2, table3)) {
-      s = conn.createScanner(table, new Authorizations());
+      Scanner s = conn.createScanner(table, new Authorizations());
       for (@SuppressWarnings("unused")
       Entry<Key,Value> entry : s) {}
     }
@@ -1139,7 +1104,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       // We should either find all closed records or no records
       // After they're closed, they are candidates for deletion
       for (int i = 0; i < 10; i++) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
         s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
         Iterator<Entry<Key,Value>> iter = s.iterator();
 
@@ -1162,7 +1127,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       }
 
       if (!allClosed) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
         s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
         for (Entry<Key,Value> entry : s) {
           log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
@@ -1173,7 +1138,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       for (int i = 0; i < 10; i++) {
         allClosed = true;
 
-        s = ReplicationTable.getScanner(conn);
+        Scanner s = ReplicationTable.getScanner(conn);
         Iterator<Entry<Key,Value>> iter = s.iterator();
 
         long recordsFound = 0l;
@@ -1197,7 +1162,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       }
 
       if (!allClosed) {
-        s = ReplicationTable.getScanner(conn);
+        Scanner s = ReplicationTable.getScanner(conn);
         StatusSection.limit(s);
         for (Entry<Key,Value> entry : s) {
           log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
@@ -1226,8 +1191,8 @@ public class ReplicationTest extends ConfigurableMacIT {
     // replication shouldn't exist when we begin
     Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
-    thread.start();
+//    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+//    thread.start();
 
     try {
       // Create two tables
@@ -1242,7 +1207,7 @@ public class ReplicationTest extends ConfigurableMacIT {
           conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
           // Use the MockReplicaSystem impl and sleep for 5seconds
           conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
+              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000"));
           attempts = 0;
         } catch (Exception e) {
           attempts--;
@@ -1313,6 +1278,7 @@ public class ReplicationTest extends ConfigurableMacIT {
               case PERMISSION_DENIED:
                 // retry -- the grant didn't happen yet
                 log.warn("Sleeping because permission was denied");
+                break;
               default:
                 throw e;
             }
@@ -1416,7 +1382,7 @@ public class ReplicationTest extends ConfigurableMacIT {
         recordsFound = 0;
         for (Entry<Key,Value> entry : s) {
           recordsFound++;
-          log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
         }
 
         if (0 == recordsFound) {
@@ -1429,8 +1395,8 @@ public class ReplicationTest extends ConfigurableMacIT {
 
       Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
     } finally {
-      thread.interrupt();
-      thread.join(5000);
+//      thread.interrupt();
+//      thread.join(5000);
     }
   }
 }


Mime
View raw message