accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/4] ACCUMULO-2914 Move ReplicationTest into ReplicationIT and ReplicationIT into MultiInstanceReplicationIT
Date Tue, 17 Jun 2014 00:06:47 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/51ec0a27/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 2af81f6..cf9f3bf 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -16,450 +16,794 @@
  */
 package org.apache.accumulo.test.replication;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.After;
+import org.apache.hadoop.io.Text;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Replication tests which verify expected functionality using a single MAC instance. A MockReplicaSystem
+ * is used to "fake" the peer instance that we're replicating to. This lets us test replication in a functional
+ * way without having to worry about two real systems.
+ */
 public class ReplicationIT extends ConfigurableMacIT {
   private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
 
-  private ExecutorService executor;
-
-  @Before
-  public void createExecutor() {
-    executor = Executors.newSingleThreadExecutor();
-  }
-
-  @After
-  public void stopExecutor() {
-    if (null != executor) {
-      executor.shutdownNow();
-    }
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 30;
   }
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    // Run the master replication loop run frequently
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
     cfg.setProperty(Property.REPLICATION_NAME, "master");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
+    cfg.setNumTservers(1);
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  @Test(timeout = 60 * 5000)
-  public void dataWasReplicatedToThePeer() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
-
-    peerCluster.start();
-
-    try {
-      final Connector connMaster = getConnector();
-      final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-  
-      ReplicationTable.create(connMaster);
-
-      String peerUserName = "peer", peerPassword = "foo";
-  
-      String peerClusterName = "peer";
-
-      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-      
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-  
-      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-      connMaster.instanceOperations().setProperty(
-          Property.REPLICATION_PEERS.getKey() + peerClusterName,
-          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-              AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-  
-      final String masterTable = "master", peerTable = "peer";
-  
-      connMaster.tableOperations().create(masterTable);
-      String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
-      Assert.assertNotNull(masterTableId);
-  
-      connPeer.tableOperations().create(peerTable);
-      String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
-      Assert.assertNotNull(peerTableId);
-
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-  
-      // Replicate this table to the peerClusterName in a table with the peerTableId table id
-      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
-  
-      // Write some data to table1
-      BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-      for (int rows = 0; rows < 5000; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
+    Multimap<String,String> logs = HashMultimap.create();
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(LogColumnFamily.NAME);
+    scanner.setRange(new Range());
+    for (Entry<Key,Value> entry : scanner) {
+      if (Thread.interrupted()) {
+        return logs;
       }
-  
-      bw.close();
-  
-      log.info("Wrote all data to master cluster");
-  
-//      log.debug("");
-//      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-//        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-//          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-//        } else {
-//          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-//        }
-//      }
-  
-      final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-  
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
-      }
-      cluster.exec(TabletServer.class);
-  
-      log.info("TabletServer restarted");
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
-      log.info("TabletServer is online");
-  
-      log.info("");
-      log.info("Fetching metadata records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-        } else {
-          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-        }
-      }
-  
-      log.info("");
-      log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
-        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-      }
-  
-      Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-  
-        @Override
-        public Boolean call() throws Exception {
-          connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
-          log.info("Drain completed");
-          return true;
-        }
-  
-      });
-  
-      try {
-        future.get(30, TimeUnit.SECONDS);
-      } catch (TimeoutException e) {
-        future.cancel(true);
-        Assert.fail("Drain did not finish within 30 seconds");
-      }
-  
-      log.info("drain completed");
-  
-      log.info("");
-      log.info("Fetching metadata records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-        } else {
-          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-        }
+
+      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+      for (String log : logEntry.logSet) {
+        // Need to normalize the log file from LogEntry
+        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
       }
-  
-      log.info("");
-      log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
-        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-      }
-  
-      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
-      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
-      Entry<Key,Value> masterEntry = null, peerEntry = null;
-      while (masterIter.hasNext() && peerIter.hasNext()) {
-        masterEntry = masterIter.next();
-        peerEntry = peerIter.next();
-        Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
-            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
-        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
-      }
-  
-      log.info("Last master entry: " + masterEntry);
-      log.info("Last peer entry: " + peerEntry);
-  
-      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
-      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
-    } finally {
-      peerCluster.stop();
     }
+    return logs;
   }
 
-  @Test(timeout = 60 * 5000)
-  public void dataReplicatedToCorrectTable() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
-
-    peer1Cluster.start();
+  @Test(timeout = 1000 * 60 * 5)
+  public void correctRecordsCompleteFile() throws Exception {
+    Connector conn = getConnector();
+    String table = "table1";
+    conn.tableOperations().create(table);
+    // If we have more than one tserver, this is subject to a race condition.
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(new byte[0], new byte[0], new byte[0]);
+      bw.addMutation(m);
+    }
 
-    try {
-      Connector connMaster = getConnector();
-      Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+    bw.close();
 
-      String peerClusterName = "peer";
-      String peerUserName = "peer", peerPassword = "foo";
+    // After writing data, we'll get a replication table
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    int attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(500);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", exists);
 
-      // Create local user
-      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+    for (int i = 0; i < 5; i++) {
+      if (conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)) {
+        break;
+      }
+      log.info("Could not read replication table, waiting and will retry");
+      Thread.sleep(1000);
+    }
 
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+    Assert.assertTrue("'root' user could not read the replication table",
+        conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
+
+    Set<String> replRows = Sets.newHashSet();
+    Scanner scanner;
+    attempts = 5;
+    while (replRows.isEmpty() && attempts > 0) {
+      scanner = ReplicationTable.getScanner(conn);
+      StatusSection.limit(scanner);
+      for (Entry<Key,Value> entry : scanner) {
+        Key k = entry.getKey();
+
+        String fileUri = k.getRow().toString();
+        try {
+          new URI(fileUri);
+        } catch (URISyntaxException e) {
+          Assert.fail("Expected a valid URI: " + fileUri);
+        }
 
-      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-      connMaster.instanceOperations().setProperty(
-          Property.REPLICATION_PEERS.getKey() + peerClusterName,
-          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+        replRows.add(fileUri);
+      }
+    }
 
-      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+    Set<String> wals = Sets.newHashSet();
+    Scanner s;
+    attempts = 5;
+    while (wals.isEmpty() && attempts > 0) {
+      s = conn.createScanner(MetadataTable.NAME, new Authorizations());
+      s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+      for (Entry<Key,Value> entry : s) {
+        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+        wals.add(new Path(logEntry.filename).toString());
+      }
+      attempts--;
+    }
 
-      // Create tables
-      connMaster.tableOperations().create(masterTable1);
-      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
-      Assert.assertNotNull(masterTableId1);
+    // We only have one file that should need replication (no trace table)
+    // We should find an entry in tablet and in the repl row
+    Assert.assertEquals("Rows found: " + replRows, 1, replRows.size());
 
-      connMaster.tableOperations().create(masterTable2);
-      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
-      Assert.assertNotNull(masterTableId2);
+    // This should be the same set of WALs that we also are using
+    Assert.assertEquals(replRows, wals);
+  }
 
-      connPeer.tableOperations().create(peerTable1);
-      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
-      Assert.assertNotNull(peerTableId1);
+  @Test(timeout = 1000 * 60 * 5)
+  public void noRecordsWithoutReplication() throws Exception {
+    Connector conn = getConnector();
+    List<String> tables = new ArrayList<>();
 
-      connPeer.tableOperations().create(peerTable2);
-      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
-      Assert.assertNotNull(peerTableId2);
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-      // Grant write permission
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+    for (int i = 0; i < 5; i++) {
+      String name = "table" + i;
+      tables.add(name);
+      conn.tableOperations().create(name);
+    }
 
-      // Replicate this table to the peerClusterName in a table with the peerTableId table id
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
+    // nor after we create some tables (that aren't being replicated)
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
+    for (String table : tables) {
+      BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
 
-      // Write some data to table1
-      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
-      long masterTable1Records = 0l;
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable1 + rows);
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
+      for (int j = 0; j < 5; j++) {
+        Mutation m = new Mutation(Integer.toString(j));
+        for (int k = 0; k < 5; k++) {
+          String value = Integer.toString(k);
           m.put(value, "", value);
-          masterTable1Records++;
         }
         bw.addMutation(m);
       }
 
       bw.close();
+    }
 
-      // Write some data to table2
-      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
-      long masterTable2Records = 0l;
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable2 + rows);
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-          masterTable2Records++;
+    // After writing data, still no replication table
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    for (String table : tables) {
+      conn.tableOperations().compact(table, null, null, true, true);
+    }
+
+    // After compacting data, still no replication table
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    for (String table : tables) {
+      conn.tableOperations().delete(table);
+    }
+
+    // After deleting tables, still no replication table
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+  }
+
+  @Test(timeout = 1000 * 60 * 5)
+  public void twoEntriesForTwoTables() throws Exception {
+    Connector conn = getConnector();
+    String table1 = "table1", table2 = "table2";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse("Replication table already existed at the beginning of the test", conn.tableOperations().exists(ReplicationTable.NAME));
+
+    // Create two tables
+    conn.tableOperations().create(table1);
+    conn.tableOperations().create(table2);
+
+    // Enable replication on table1
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+
+    // Despite having replication on, we shouldn't have any need to write a record to it (and create it)
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+
+    for (int rows = 0; rows < 50; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Compact the table1
+    conn.tableOperations().compact(table1, null, null, true, true);
+
+    // After writing data, we'll get a replication table
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    int attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(1000);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", exists);
+
+    Assert.assertTrue(conn.tableOperations().exists(ReplicationTable.NAME));
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
+
+    // Verify that we found a single replication record that's for table1
+    Scanner s = ReplicationTable.getScanner(conn, new Authorizations());
+    StatusSection.limit(s);
+    Iterator<Entry<Key,Value>> iter = s.iterator();
+    attempts = 5;
+    while (attempts > 0) {
+      if (!iter.hasNext()) {
+        s.close();
+        Thread.sleep(1000);
+        s = ReplicationTable.getScanner(conn, new Authorizations());
+        iter = s.iterator();
+        attempts--;
+      } else {
+        break;
+      }
+    }
+    Assert.assertTrue(iter.hasNext());
+    Entry<Key,Value> entry = iter.next();
+    // We should at least find one status record for this table, we might find a second if another log was started from ingesting the data
+    Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey()
+        .getColumnQualifier().toString());
+    s.close();
+
+    // Enable replication on table2
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+
+    for (int rows = 0; rows < 50; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Compact the table2
+    conn.tableOperations().compact(table2, null, null, true, true);
+
+    // After writing data, we'll get a replication table
+    Assert.assertTrue(conn.tableOperations().exists(ReplicationTable.NAME));
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
+
+    Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2));
+    Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds);
+
+    Thread.sleep(2000);
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.ReplicationSection.getRange());
+
+    List<Entry<Key,Value>> records = new ArrayList<>();
+    for (Entry<Key,Value> metadata : s) {
+      records.add(metadata);
+    }
+
+    Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size());
+
+    for (Entry<Key,Value> metadata : records) {
+      Assert.assertTrue("Expected record to be in metadata but wasn't " + metadata.getKey().toStringNoTruncate() + ", tableIds remaining "
+          + tableIdsForMetadata, tableIdsForMetadata.remove(metadata.getKey().getColumnQualifier().toString()));
+    }
+
+    Assert.assertTrue("Expected that we had removed all metadata entries " + tableIdsForMetadata, tableIdsForMetadata.isEmpty());
+
+    // Should be creating these records in replication table from metadata table every second
+    Thread.sleep(5000);
+
+    // Verify that we found two replication records: one for table1 and one for table2
+    s = ReplicationTable.getScanner(conn, new Authorizations());
+    StatusSection.limit(s);
+    iter = s.iterator();
+    Assert.assertTrue("Found no records in replication table", iter.hasNext());
+    entry = iter.next();
+    Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString()));
+    Assert.assertTrue("Expected to find two elements in replication table, only found one ", iter.hasNext());
+    entry = iter.next();
+    Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString()));
+    Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
+  }
+
+  @Test
+  public void replicationEntriesPrecludeWalDeletion() throws Exception {
+    final Connector conn = getConnector();
+    String table1 = "table1", table2 = "table2", table3 = "table3";
+    final Multimap<String,String> logs = HashMultimap.create();
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            logs.putAll(getLogs(conn));
+          } catch (TableNotFoundException e) {
+            log.error("Metadata table doesn't exist");
+          }
         }
-        bw.addMutation(m);
       }
 
-      bw.close();
+    });
 
-      log.info("Wrote all data to master cluster");
+    t.start();
 
-      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
-          masterTable2);
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
 
-      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
-        Thread.sleep(500);
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
-      // Restart the tserver to force a close on the WAL
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
+
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
-      cluster.exec(TabletServer.class);
+      bw.addMutation(m);
+    }
 
-      log.info("Restarted the tserver");
+    bw.close();
 
-      // Read the data -- the tserver is back up and running
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
+
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Force a write to metadata for the data written
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
+    }
+
+    keepRunning.set(false);
+    t.join(5000);
 
-      // Wait for both tables to be replicated
-      log.info("Waiting for {} for {}", filesFor1, masterTable1);
-      connMaster.replicationOperations().drain(masterTable1, filesFor1);
+    // The master is only running every second to create records in the replication table from the metadata table
+    // Sleep a sufficient amount of time to ensure that we get the straggling WALs that might have been created at the end
+    Thread.sleep(5000);
 
-      log.info("Waiting for {} for {}", filesFor2, masterTable2);
-      connMaster.replicationOperations().drain(masterTable2, filesFor2);
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Set<String> replFiles = new HashSet<>();
+    for (Entry<Key,Value> entry : s) {
+      replFiles.add(entry.getKey().getRow().toString());
+    }
 
-      long countTable = 0l;
-      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
-        countTable++;
-        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-            .startsWith(masterTable1));
+    // We might have a WAL that was use solely for the replication table
+    // We want to remove that from our list as it should not appear in the replication table
+    String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
+    Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
+    while (observedLogs.hasNext()) {
+      Entry<String,String> observedLog = observedLogs.next();
+      if (replicationTableId.equals(observedLog.getValue())) {
+        observedLogs.remove();
       }
+    }
 
-      log.info("Found {} records in {}", countTable, peerTable1);
-      Assert.assertEquals(masterTable1Records, countTable);
+    // We should have *some* reference to each log that was seen in the metadata table
+    // They might not yet all be closed though (might be newfile)
+    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 
-      countTable = 0l;
-      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
-        countTable++;
-        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-            .startsWith(masterTable2));
+    for (String replFile : replFiles) {
+      Path p = new Path(replFile);
+      FileSystem fs = p.getFileSystem(new Configuration());
+      Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
+    }
+  }
+
+  @Test
+  public void combinerWorksOnMetadata() throws Exception {
+    Connector conn = getConnector();
+
+    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+    ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
+
+    Status stat1 = StatusUtil.fileCreated(100);
+    Status stat2 = StatusUtil.fileClosed();
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+    bw.close();
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+
+    Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+    Assert.assertEquals(stat1, actual);
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+    bw.close();
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+
+    actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+    Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void noDeadlock() throws Exception {
+    final Connector conn = getConnector();
+
+    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+      conn.tableOperations().delete(ReplicationTable.NAME);
+    }
+
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    final Set<String> metadataWals = new HashSet<>();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            metadataWals.addAll(getLogs(conn).keySet());
+          } catch (Exception e) {
+            log.error("Metadata table doesn't exist");
+          }
+        }
       }
 
-      log.info("Found {} records in {}", countTable, peerTable2);
-      Assert.assertEquals(masterTable2Records, countTable);
+    });
 
-    } finally {
-      peer1Cluster.stop();
+    t.start();
+
+    String table1 = "table1", table2 = "table2", table3 = "table3";
+
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Flush everything to try to make the replication records
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
+    }
+
+    keepRunning.set(false);
+    t.join(5000);
+
+    for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
+      Scanner s = conn.createScanner(table, new Authorizations());
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : s) {}
     }
   }
 
-  @Test(timeout = 10 * 60 * 1000)
-  public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+  @Test(timeout = 60000)
+  public void filesClosedAfterUnused() throws Exception {
+    Connector conn = getConnector();
 
-    peerCluster.start();
+    String table = "table";
+    conn.tableOperations().create(table);
+    String tableId = conn.tableOperations().tableIdMap().get(table);
 
-    Connector connMaster = getConnector();
-    Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+    Assert.assertNotNull(tableId);
 
-    String peerUserName = "repl";
-    String peerPassword = "passwd";
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    // just sleep
+    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
 
-    // Create a user on the peer for replication to use
-    connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+    // Write a mutation to make a log file
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    Mutation m = new Mutation("one");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
 
-    String peerClusterName = "peer";
+    // Write another to make sure the logger rolls itself?
+    bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    m = new Mutation("three");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
 
-    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-    connMaster.instanceOperations().setProperty(
-        Property.REPLICATION_PEERS.getKey() + peerClusterName,
-        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-            AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    s.setRange(TabletsSection.getRange(tableId));
+    Set<String> wals = new HashSet<>();
+    for (Entry<Key,Value> entry : s) {
+      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+      for (String file : logEntry.logSet) {
+        Path p = new Path(file);
+        wals.add(p.toString());
+      }
+    }
 
-    // Configure the credentials we should use to authenticate ourselves to the peer for replication
-    connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-    connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+    log.warn("Found wals {}", wals);
+
+    // for (int j = 0; j < 5; j++) {
+    bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    m = new Mutation("three");
+    byte[] bytes = new byte[1024 * 1024];
+    m.put("1".getBytes(), new byte[0], bytes);
+    m.put("2".getBytes(), new byte[0], bytes);
+    m.put("3".getBytes(), new byte[0], bytes);
+    m.put("4".getBytes(), new byte[0], bytes);
+    m.put("5".getBytes(), new byte[0], bytes);
+    bw.addMutation(m);
+    bw.close();
 
-    String masterTable = "master", peerTable = "peer";
+    conn.tableOperations().flush(table, null, null, true);
 
-    connMaster.tableOperations().create(masterTable);
-    String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
-    Assert.assertNotNull(masterTableId);
+    while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
+      UtilWaitThread.sleep(500);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      s.fetchColumnFamily(LogColumnFamily.NAME);
+      s.setRange(TabletsSection.getRange(tableId));
+      for (Entry<Key,Value> entry : s) {
+        log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
+      }
 
-    connPeer.tableOperations().create(peerTable);
-    String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
-    Assert.assertNotNull(peerTableId);
+      try {
+        s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        Text buff = new Text();
+        boolean allReferencedLogsClosed = true;
+        int recordsFound = 0;
+        for (Entry<Key,Value> e : s) {
+          recordsFound++;
+          allReferencedLogsClosed = true;
+          StatusSection.getFile(e.getKey(), buff);
+          String file = buff.toString();
+          if (wals.contains(file)) {
+            Status stat = Status.parseFrom(e.getValue().get());
+            if (!stat.getClosed()) {
+              log.info("{} wasn't closed", file);
+              allReferencedLogsClosed = false;
+            }
+          }
+        }
 
-    // Give our replication user the ability to write to the table
-    connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+        if (recordsFound > 0 && allReferencedLogsClosed) {
+          return;
+        }
+        Thread.sleep(1000);
+      } catch (RuntimeException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof AccumuloSecurityException) {
+          AccumuloSecurityException ase = (AccumuloSecurityException) cause;
+          switch (ase.getSecurityErrorCode()) {
+            case PERMISSION_DENIED:
+              // We tried to read the replication table before the GRANT went through
+              Thread.sleep(1000);
+              break;
+            default:
+              throw e;
+          }
+        }
+      }
+    }
 
-    // Replicate this table to the peerClusterName in a table with the peerTableId table id
-    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
-    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
+    Assert.fail("We had a file that was referenced but didn't get closed");
+  }
+
+  @Test(timeout = 1000 * 60 * 5)
+  public void singleTableWithSingleTarget() throws Exception {
+    // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
+    // against expected Status messages.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+
+    Connector conn = getConnector();
+    String table1 = "table1";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    // Create a table
+    conn.tableOperations().create(table1);
+
+    int attempts = 5;
+    while (attempts > 0) {
+      try {
+        // Enable replication on table1
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+        // Replicate table1 to cluster1 in the table with id of '4'
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
+        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
+        break;
+      } catch (Exception e) {
+        attempts--;
+        if (attempts <= 0) {
+          throw e;
+        }
+        UtilWaitThread.sleep(500);
+      }
+    }
 
     // Write some data to table1
-    BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-    for (int rows = 0; rows < 5000; rows++) {
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
       Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 100; cols++) {
+      for (int cols = 0; cols < 50; cols++) {
         String value = Integer.toString(cols);
         m.put(value, "", value);
       }
@@ -468,110 +812,289 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     bw.close();
 
-    log.info("Wrote all data to master cluster");
+    // Make sure the replication table exists at this point
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(200);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table was never created", exists);
 
-    Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable);
+    // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
+    for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
+      UtilWaitThread.sleep(1000);
+    }
 
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    Assert.assertTrue("Combiner was never set on replication table",
+        conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+
+    // Trigger the minor compaction, waiting for it to finish.
+    // This should write the entry to metadata that the file has data
+    conn.tableOperations().flush(table1, null, null, true);
+
+    // Make sure that we have one status element, should be a new file
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Entry<Key,Value> entry = null;
+    Status expectedStatus = StatusUtil.openWithUnknownLength();
+    attempts = 5;
+    // This record will move from new to new with infinite length because of the minc (flush)
+    while (null == entry && attempts > 0) {
+      try {
+        entry = Iterables.getOnlyElement(s);
+        Status actual = Status.parseFrom(entry.getValue().get());
+        if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
+          entry = null;
+          // the master process didn't yet fire and write the new mutation, wait for it to do
+          // so and try to read it again
+          Thread.sleep(1000);
+        }
+      } catch (NoSuchElementException e) {
+        entry = null;
+        Thread.sleep(500);
+      } catch (IllegalArgumentException e) {
+        // saw this contain 2 elements once
+        s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        throw e;
+      } finally {
+        attempts--;
+      }
     }
 
-    cluster.exec(TabletServer.class);
+    Assert.assertNotNull("Could not find expected entry in replication table", entry);
+    Status actual = Status.parseFrom(entry.getValue().get());
+    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual),
+        !actual.getClosed() && actual.getInfiniteEnd());
+
+    // Try a couple of times to watch for the work record to be created
+    boolean notFound = true;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      int elementsFound = Iterables.size(s);
+      if (0 < elementsFound) {
+        Assert.assertEquals(1, elementsFound);
+        notFound = false;
+      }
+      Thread.sleep(500);
+    }
 
-    for (@SuppressWarnings("unused")
-    Entry<Key,Value> kv : connMaster.createScanner(masterTable, Authorizations.EMPTY)) {}
+    // If we didn't find the work record, print the contents of the table
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+    }
 
-    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
-      log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+    // Write some more data so that we over-run the single WAL
+    bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 3000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
     }
 
-    connMaster.replicationOperations().drain(masterTable, files);
+    bw.close();
 
-    Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
-    Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
-    while (masterIter.hasNext() && peerIter.hasNext()) {
-      Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next();
-      Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
-          masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
-      Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+    log.info("Issued compaction for table");
+    conn.tableOperations().compact(table1, null, null, true, true);
+    log.info("Compaction completed");
+
+    // Master is creating entries in the replication table from the metadata table every second.
+    // Compaction should trigger the record to be written to metadata. Wait a bit to ensure
+    // that the master has time to work.
+    Thread.sleep(5000);
+
+    s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    int numRecords = 0;
+    for (Entry<Key,Value> e : s) {
+      numRecords++;
+      log.info("Found status record {}\t{}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
     }
 
-    Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
-    Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+    Assert.assertEquals(2, numRecords);
+
+    // We should eventually get 2 work records recorded, need to account for a potential delay though
+    // might see: status1 -> work1 -> status2 -> (our scans) -> work2
+    notFound = true;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      int elementsFound = Iterables.size(s);
+      if (2 == elementsFound) {
+        notFound = false;
+      }
+      Thread.sleep(500);
+    }
 
-    peerCluster.stop();
+    // If we didn't find the work record, print the contents of the table
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entries for the status entries", notFound);
+    }
   }
 
-  @Test(timeout = 60 * 5000)
-  public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
+  @Test(timeout = 1000 * 60 * 5)
+  public void correctClusterNameInWorkEntry() throws Exception {
+    Connector conn = getConnector();
+    String table1 = "table1";
 
-    peer1Cluster.start();
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-    try {
-      Connector connMaster = getConnector();
-      Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+    // Create two tables
+    conn.tableOperations().create(table1);
+
+    int attempts = 5;
+    while (attempts > 0) {
+      try {
+        // Enable replication on table1
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+        // Replicate table1 to cluster1 in the table with id of '4'
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
+        attempts = 0;
+      } catch (Exception e) {
+        attempts--;
+        if (attempts <= 0) {
+          throw e;
+        }
+        UtilWaitThread.sleep(500);
+      }
+    }
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
 
-      String peerClusterName = "peer";
+    bw.close();
+
+    String tableId = conn.tableOperations().tableIdMap().get(table1);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    // Make sure the replication table exists at this point
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(500);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", exists);
 
-      String peerUserName = "repl";
-      String peerPassword = "passwd";
+    for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
+      Thread.sleep(1000);
+    }
+
+    Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
 
-      // Create a user on the peer for replication to use
-      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+    boolean notFound = true;
+    Scanner s;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      try {
+        Entry<Key,Value> e = Iterables.getOnlyElement(s);
+        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+        notFound = false;
+      } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
+        s = ReplicationTable.getScanner(conn);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        Assert.fail("Found more than one work section entry");
+      }
 
-      // Configure the credentials we should use to authenticate ourselves to the peer for replication
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+      Thread.sleep(500);
+    }
 
-      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-      connMaster.instanceOperations().setProperty(
-          Property.REPLICATION_PEERS.getKey() + peerClusterName,
-          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+    }
+  }
 
-      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+  @Test(timeout = 6 * 60 * 1000)
+  public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
+    Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
+    for (ProcessReference ref : gcProcs) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
+    }
 
-      connMaster.tableOperations().create(masterTable1);
-      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
-      Assert.assertNotNull(masterTableId1);
+    final Connector conn = getConnector();
 
-      connMaster.tableOperations().create(masterTable2);
-      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
-      Assert.assertNotNull(masterTableId2);
+    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+      conn.tableOperations().delete(ReplicationTable.NAME);
+    }
 
-      connPeer.tableOperations().create(peerTable1);
-      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
-      Assert.assertNotNull(peerTableId1);
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    final Set<String> metadataWals = new HashSet<>();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            metadataWals.addAll(getLogs(conn).keySet());
+          } catch (Exception e) {
+            log.error("Metadata table doesn't exist");
+          }
+        }
+      }
 
-      connPeer.tableOperations().create(peerTable2);
-      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
-      Assert.assertNotNull(peerTableId2);
+    });
 
-      // Give our replication user the ability to write to the tables
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+    t.start();
 
-      // Replicate this table to the peerClusterName in a table with the peerTableId table id
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
+    String table1 = "table1", table2 = "table2", table3 = "table3";
 
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
+    BatchWriter bw;
+    try {
+      conn.tableOperations().create(table1);
+      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+      conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+          ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
 
       // Write some data to table1
-      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable1 + rows);
-        for (int cols = 0; cols < 100; cols++) {
+      bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
           String value = Integer.toString(cols);
           m.put(value, "", value);
         }
@@ -580,11 +1103,15 @@ public class ReplicationIT extends ConfigurableMacIT {
 
       bw.close();
 
+      conn.tableOperations().create(table2);
+      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+
       // Write some data to table2
-      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable2 + rows);
-        for (int cols = 0; cols < 100; cols++) {
+      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
           String value = Integer.toString(cols);
           m.put(value, "", value);
         }
@@ -593,69 +1120,343 @@ public class ReplicationIT extends ConfigurableMacIT {
 
       bw.close();
 
-      log.info("Wrote all data to master cluster");
+      conn.tableOperations().create(table3);
+      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
-        Thread.sleep(500);
+      // Write some data to table3
+      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
       }
 
+      bw.close();
 
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      // Flush everything to try to make the replication records
+      for (String table : Arrays.asList(table1, table2, table3)) {
+        conn.tableOperations().compact(table, null, null, true, true);
       }
+    } finally {
+      keepRunning.set(false);
+      t.join(5000);
+      Assert.assertFalse(t.isAlive());
+    }
 
-      cluster.exec(TabletServer.class);
-      // connMaster.tableOperations().compact(masterTable1, null, null, true, false);
-      // connMaster.tableOperations().compact(masterTable2, null, null, true, false);
+    // Kill the tserver(s) and restart them
+    // to ensure that the WALs we previously observed all move to closed.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
 
-      // Wait until we fully replicated something
-      boolean fullyReplicated = false;
-      for (int i = 0; i < 10 && !fullyReplicated; i++) {
-        UtilWaitThread.sleep(2000);
+    cluster.exec(TabletServer.class);
 
-        Scanner s = ReplicationTable.getScanner(connMaster);
-        WorkSection.limit(s);
-        for (Entry<Key,Value> entry : s) {
-          Status status = Status.parseFrom(entry.getValue().get());
-          if (StatusUtil.isFullyReplicated(status)) {
-            fullyReplicated |= true;
+    // Make sure we can read all the tables (recovery complete)
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      Scanner s = conn.createScanner(table, new Authorizations());
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : s) {}
+    }
+
+    // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
+    // in the metadata table, and then in the replication table
+    Process gc = cluster.exec(SimpleGarbageCollector.class);
+
+    try {
+      boolean allClosed = true;
+
+      // We should either find all closed records or no records
+      // After they're closed, they are candidates for deletion
+      for (int i = 0; i < 10; i++) {
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+        Iterator<Entry<Key,Value>> iter = s.iterator();
+
+        long recordsFound = 0l;
+        while (allClosed && iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          String wal = entry.getKey().getRow().toString();
+          if (metadataWals.contains(wal)) {
+            Status status = Status.parseFrom(entry.getValue().get());
+            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+            allClosed &= status.getClosed();
+            recordsFound++;
           }
         }
+
+        log.info("Found {} records from the metadata table", recordsFound);
+        if (allClosed) {
+          break;
+        }
       }
 
-      Assert.assertNotEquals(0, fullyReplicated);
+      if (!allClosed) {
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+        for (Entry<Key,Value> entry : s) {
+          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+        }
+        Assert.fail("Expected all replication records in the metadata table to be closed");
+      }
 
-      long countTable = 0l;
       for (int i = 0; i < 10; i++) {
-        for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
-          countTable++;
-          Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-              .startsWith(masterTable1));
+        allClosed = true;
+
+        Scanner s = ReplicationTable.getScanner(conn);
+        Iterator<Entry<Key,Value>> iter = s.iterator();
+
+        long recordsFound = 0l;
+        while (allClosed && iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          String wal = entry.getKey().getRow().toString();
+          if (metadataWals.contains(wal)) {
+            Status status = Status.parseFrom(entry.getValue().get());
+            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+            allClosed &= status.getClosed();
+            recordsFound++;
+          }
         }
-  
-        log.info("Found {} records in {}", countTable, peerTable1);
 
-        if (0l == countTable) {
-          Thread.sleep(5000);
-        } else {
+        log.info("Found {} records from the replication table", recordsFound);
+        if (allClosed) {
           break;
         }
+
+        UtilWaitThread.sleep(1000);
+      }
+
+      if (!allClosed) {
+        Scanner s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        for (Entry<Key,Value> entry : s) {
+          log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
+        }
+        Assert.fail("Expected all replication records in the replication table to be closed");
       }
 
-      Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0);
+    } finally {
+      gc.destroy();
+      gc.waitFor();
+    }
+
+  }
+
+  @Test(timeout = 5 * 60 * 1000)
+  public void replicatedStatusEntriesAreDeleted() throws Exception {
+    // Just stop it now, we'll restart it after we restart the tserver
+    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+
+    final Connector conn = getConnector();
+    log.info("Got connector to MAC");
+    String table1 = "table1";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-      countTable = 0l;
-      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
-        countTable++;
-        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-            .startsWith(masterTable2));
+    // Create two tables
+    conn.tableOperations().create(table1);
+
+    int attempts = 5;
+    while (attempts > 0) {
+      try {
+        // Enable replication on table1
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+        // Replicate table1 to cluster1 in the table with id of '4'
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
+        // Use the MockReplicaSystem impl and sleep for 5seconds
+        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000"));
+        attempts = 0;
+      } catch (Exception e) {
+        attempts--;
+        if (attempts <= 0) {
+          throw e;
+        }
+        UtilWaitThread.sleep(500);
       }
+    }
 
-      log.info("Found {} records in {}", countTable, peerTable2);
-      Assert.assertTrue(countTable > 0);
+    String tableId = conn.tableOperations().tableIdMap().get(table1);
+    Assert.assertNotNull("Could not determine table id for " + table1, tableId);
 
-    } finally {
-      peer1Cluster.stop();
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Make sure the replication table exists at this point
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    attempts = 10;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(1000);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", exists);
+
+    // Grant ourselves the write permission for later
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Find the WorkSection record that will be created for that data we ingested
+    boolean notFound = true;
+    Scanner s;
+    for (int i = 0; i < 10 && notFound; i++) {
+      try {
+        s = ReplicationTable.getScanner(conn);
+        WorkSection.limit(s);
+        Entry<Key,Value> e = Iterables.getOnlyElement(s);
+        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+        notFound = false;
+      } catch (NoSuchElementException e) {
+
+      } catch (IllegalArgumentException e) {
+        // Somehow we got more than one element. Log what they were
+        s = ReplicationTable.getScanner(conn);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        Assert.fail("Found more than one work section entry");
+      } catch (RuntimeException e) {
+        // Catch a propagation issue, fail if it's not what we expect
+        Throwable cause = e.getCause();
+        if (cause instanceof AccumuloSecurityException) {
+          AccumuloSecurityException sec = (AccumuloSecurityException) cause;
+          switch (sec.getSecurityErrorCode()) {
+            case PERMISSION_DENIED:
+              // retry -- the grant didn't happen yet
+              log.warn("Sleeping because permission was denied");
+              break;
+            default:
+              throw e;
+          }
+        } else {
+          throw e;
+        }
+      }
+
+      Thread.sleep(2000);
+    }
+
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+    }
+
+    /**
+     * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
+     * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
+     * anymore
+     */
+
+    log.info("Killing tserver");
+    // Kill the tserver(s) and restart them
+    // to ensure that the WALs we previously observed all move to closed.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+
+    log.info("Starting tserver");
+    cluster.exec(TabletServer.class);
+
+    log.info("Waiting to read tables");
+
+    // Make sure we can read all the tables (recovery complete)
+    for (String table : new String[] {MetadataTable.NAME, table1}) {
+      s = conn.createScanner(table, new Authorizations());
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : s) {}
+    }
+
+    log.info("Checking for replication entries in replication");
+    // Then we need to get those records over to the replication table
+    boolean foundResults = false;
+    for (int i = 0; i < 5; i++) {
+      s = ReplicationTable.getScanner(conn);
+      int count = 0;
+      for (Entry<Key,Value> entry : s) {
+        count++;
+        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      }
+      if (count > 0) {
+        foundResults = true;
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
+
+    getCluster().exec(SimpleGarbageCollector.class);
+
+    // Wait for a bit since the GC has to run (should be running after a one second delay)
+    Thread.sleep(5000);
+
+    // We expect no records in the metadata table after compaction. We have to poll
+    // because we have to wait for the StatusMaker's next iteration which will clean
+    // up the dangling *closed* records after we create the record in the replication table.
+    // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+    log.info("Checking metadata table for replication entries");
+    foundResults = true;
+    for (int i = 0; i < 5; i++) {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      s.setRange(ReplicationSection.getRange());
+      long size = 0;
+      for (Entry<Key,Value> e : s) {
+        size++;
+        log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+      }
+      if (size == 0) {
+        foundResults = false;
+        break;
+      }
+      Thread.sleep(1000);
+      log.info("");
+    }
+
+    Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+
+    /**
+     * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
+     * deleted after replication occurs
+     */
+
+    int recordsFound = 0;
+    for (int i = 0; i < 10; i++) {
+      s = ReplicationTable.getScanner(conn);
+      recordsFound = 0;
+      for (Entry<Key,Value> entry : s) {
+        recordsFound++;
+        log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+      }
+
+      if (0 == recordsFound) {
+        break;
+      } else {
+        Thread.sleep(1000);
+        log.info("");
+      }
     }
+
+    Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
   }
 }


Mime
View raw message