accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [01/51] [abbrv] ACCUMULO-378 Test class consolidation
Date Sat, 14 Jun 2014 04:55:01 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master a32b92628 -> a2aabbcbb


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index 7146019..f42c5ad 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -19,41 +19,72 @@ package org.apache.accumulo.test.replication;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.protobuf.TextFormat;
 
 /**
  * Tests for replication that should be run at every build -- basic functionality
  */
 public class ReplicationTest extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationTest.class);
 
   @Override
   public int defaultTimeoutSeconds() {
@@ -62,10 +93,37 @@ public class ReplicationTest extends ConfigurableMacIT {
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
     // Run the master replication loop run frequently
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
+    cfg.setProperty(Property.REPLICATION_NAME, "master");
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
     cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
+    Multimap<String,String> logs = HashMultimap.create();
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(LogColumnFamily.NAME);
+    scanner.setRange(new Range());
+    for (Entry<Key,Value> entry : scanner) {
+      if (Thread.interrupted()) {
+        return logs;
+      }
+
+      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+      for (String log : logEntry.logSet) {
+        // Need to normalize the log file from LogEntry
+        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
+      }
+    }
+    return logs;
   }
 
   @Test
@@ -300,4 +358,1079 @@ public class ReplicationTest extends ConfigurableMacIT {
     Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
   }
 
+  @Test
+  public void replicationEntriesPrecludeWalDeletion() throws Exception {
+    final Connector conn = getConnector();
+    String table1 = "table1", table2 = "table2", table3 = "table3";
+    final Multimap<String,String> logs = HashMultimap.create();
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            logs.putAll(getLogs(conn));
+          } catch (TableNotFoundException e) {
+            log.error("Metadata table doesn't exist");
+          }
+        }
+      }
+
+    });
+
+    t.start();
+
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
+
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    Thread.sleep(1000);
+
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Force a write to metadata for the data written
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
+    }
+
+    keepRunning.set(false);
+    t.join(5000);
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Set<String> replFiles = new HashSet<>();
+    for (Entry<Key,Value> entry : s) {
+      replFiles.add(entry.getKey().getRow().toString());
+    }
+
+    // We might have a WAL that was use solely for the replication table
+    // We want to remove that from our list as it should not appear in the replication table
+    String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
+    Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
+    while (observedLogs.hasNext()) {
+      Entry<String,String> observedLog = observedLogs.next();
+      if (replicationTableId.equals(observedLog.getValue())) {
+        observedLogs.remove();
+      }
+    }
+
+    // We should have *some* reference to each log that was seen in the metadata table
+    // They might not yet all be closed though (might be newfile)
+    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
+
+    for (String replFile : replFiles) {
+      Path p = new Path(replFile);
+      FileSystem fs = p.getFileSystem(new Configuration());
+      Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
+    }
+  }
+
+  @Test
+  public void combinerWorksOnMetadata() throws Exception {
+    Connector conn = getConnector();
+
+    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+    ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
+
+    Status stat1 = StatusUtil.fileCreated(100);
+    Status stat2 = StatusUtil.fileClosed();
+
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+    bw.close();
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+
+    Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+    Assert.assertEquals(stat1, actual);
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+    bw.close();
+
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+
+    actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+    Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void noDeadlock() throws Exception {
+    final Connector conn = getConnector();
+
+    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+      conn.tableOperations().delete(ReplicationTable.NAME);
+    }
+
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    final Set<String> metadataWals = new HashSet<>();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            metadataWals.addAll(getLogs(conn).keySet());
+          } catch (Exception e) {
+            log.error("Metadata table doesn't exist");
+          }
+        }
+      }
+
+    });
+
+    t.start();
+
+    String table1 = "table1", table2 = "table2", table3 = "table3";
+
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Flush everything to try to make the replication records
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
+    }
+
+    keepRunning.set(false);
+    t.join(5000);
+
+    for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
+      Scanner s = conn.createScanner(table, new Authorizations());
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : s) {}
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void filesClosedAfterUnused() throws Exception {
+    Connector conn = getConnector();
+
+    String table = "table";
+    conn.tableOperations().create(table);
+    String tableId = conn.tableOperations().tableIdMap().get(table);
+
+    Assert.assertNotNull(tableId);
+
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+    // just sleep
+    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
+
+    // Write a mutation to make a log file
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    Mutation m = new Mutation("one");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+
+    // Write another to make sure the logger rolls itself?
+    bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    m = new Mutation("three");
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    s.setRange(TabletsSection.getRange(tableId));
+    Set<String> wals = new HashSet<>();
+    for (Entry<Key,Value> entry : s) {
+      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+      for (String file : logEntry.logSet) {
+        Path p = new Path(file);
+        wals.add(p.toString());
+      }
+    }
+
+    log.warn("Found wals {}", wals);
+
+    // for (int j = 0; j < 5; j++) {
+    bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    m = new Mutation("three");
+    byte[] bytes = new byte[1024 * 1024];
+    m.put("1".getBytes(), new byte[0], bytes);
+    m.put("2".getBytes(), new byte[0], bytes);
+    m.put("3".getBytes(), new byte[0], bytes);
+    m.put("4".getBytes(), new byte[0], bytes);
+    m.put("5".getBytes(), new byte[0], bytes);
+    bw.addMutation(m);
+    bw.close();
+
+    conn.tableOperations().flush(table, null, null, true);
+
+    while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
+      UtilWaitThread.sleep(500);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      s.fetchColumnFamily(LogColumnFamily.NAME);
+      s.setRange(TabletsSection.getRange(tableId));
+      for (Entry<Key,Value> entry : s) {
+        log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
+      }
+
+      s = ReplicationTable.getScanner(conn);
+      StatusSection.limit(s);
+      Text buff = new Text();
+      boolean allReferencedLogsClosed = true;
+      int recordsFound = 0;
+      for (Entry<Key,Value> e : s) {
+        recordsFound++;
+        allReferencedLogsClosed = true;
+        StatusSection.getFile(e.getKey(), buff);
+        String file = buff.toString();
+        if (wals.contains(file)) {
+          Status stat = Status.parseFrom(e.getValue().get());
+          if (!stat.getClosed()) {
+            log.info("{} wasn't closed", file);
+            allReferencedLogsClosed = false;
+          }
+        }
+      }
+
+      if (recordsFound > 0 && allReferencedLogsClosed) {
+        return;
+      }
+
+      Thread.sleep(1000);
+    }
+
+    Assert.fail("We had a file that was referenced but didn't get closed");
+  }
+
+  @Test
+  public void singleTableWithSingleTarget() throws Exception {
+    // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
+    // against expected Status messages.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+
+    Connector conn = getConnector();
+    String table1 = "table1";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    // Create a table
+    conn.tableOperations().create(table1);
+
+    int attempts = 5;
+    while (attempts > 0) {
+      try {
+        // Enable replication on table1
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+        // Replicate table1 to cluster1 in the table with id of '4'
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
+        break;
+      } catch (Exception e) {
+        attempts--;
+        if (attempts <= 0) {
+          throw e;
+        }
+        UtilWaitThread.sleep(500);
+      }
+    }
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    // Make sure the replication table exists at this point
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(200);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table was never created", exists);
+
+    // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
+    for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
+      UtilWaitThread.sleep(1000);
+    }
+
+    Assert.assertTrue("Combiner was never set on replication table",
+        conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+
+    // Trigger the minor compaction, waiting for it to finish.
+    // This should write the entry to metadata that the file has data
+    conn.tableOperations().flush(table1, null, null, true);
+
+    // Make sure that we have one status element, should be a new file
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Entry<Key,Value> entry = null;
+    Status expectedStatus = StatusUtil.openWithUnknownLength();
+    attempts = 5;
+    // This record will move from new to new with infinite length because of the minc (flush)
+    while (null == entry && attempts > 0) {
+      try {
+        entry = Iterables.getOnlyElement(s);
+        Status actual = Status.parseFrom(entry.getValue().get());
+        if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
+          entry = null;
+          // the master process didn't yet fire and write the new mutation, wait for it to do
+          // so and try to read it again
+          Thread.sleep(1000);
+        }
+      } catch (NoSuchElementException e) {
+        entry = null;
+        Thread.sleep(500);
+      } catch (IllegalArgumentException e) {
+        // saw this contain 2 elements once
+        s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        throw e;
+      } finally {
+        attempts--;
+      }
+    }
+
+    Assert.assertNotNull("Could not find expected entry in replication table", entry);
+    Status actual = Status.parseFrom(entry.getValue().get());
+    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
+
+    // Try a couple of times to watch for the work record to be created
+    boolean notFound = true;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      int elementsFound = Iterables.size(s);
+      if (0 < elementsFound) {
+        Assert.assertEquals(1, elementsFound);
+        notFound = false;
+      }
+      Thread.sleep(500);
+    }
+
+    // If we didn't find the work record, print the contents of the table
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+    }
+
+    // Write some more data so that we over-run the single WAL
+    bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    conn.tableOperations().compact(ReplicationTable.NAME, null, null, true, true);
+
+    s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Assert.assertEquals(2, Iterables.size(s));
+
+    // We should eventually get 2 work records recorded, need to account for a potential delay though
+    // might see: status1 -> work1 -> status2 -> (our scans) -> work2
+    notFound = true;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      int elementsFound = Iterables.size(s);
+      if (2 == elementsFound) {
+        notFound = false;
+      }
+      Thread.sleep(500);
+    }
+
+    // If we didn't find the work record, print the contents of the table
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entries for the status entries", notFound);
+    }
+  }
+
+  @Test
+  public void correctClusterNameInWorkEntry() throws Exception {
+    Connector conn = getConnector();
+    String table1 = "table1";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    // Create two tables
+    conn.tableOperations().create(table1);
+
+    int attempts = 5;
+    while (attempts > 0) {
+      try {
+        // Enable replication on table1
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+        // Replicate table1 to cluster1 in the table with id of '4'
+        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+        attempts = 0;
+      } catch (Exception e) {
+        attempts--;
+        if (attempts <= 0) {
+          throw e;
+        }
+        UtilWaitThread.sleep(500);
+      }
+    }
+
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    String tableId = conn.tableOperations().tableIdMap().get(table1);
+    Assert.assertNotNull("Table ID was null", tableId);
+
+    // Make sure the replication table exists at this point
+    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    attempts = 5;
+    do {
+      if (!exists) {
+        UtilWaitThread.sleep(500);
+        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        attempts--;
+      }
+    } while (!exists && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", exists);
+
+    for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
+      Thread.sleep(1000);
+    }
+
+    Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
+
+    boolean notFound = true;
+    Scanner s;
+    for (int i = 0; i < 10 && notFound; i++) {
+      s = ReplicationTable.getScanner(conn);
+      WorkSection.limit(s);
+      try {
+        Entry<Key,Value> e = Iterables.getOnlyElement(s);
+        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+        notFound = false;
+      } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
+        s = ReplicationTable.getScanner(conn);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        Assert.fail("Found more than one work section entry");
+      }
+
+      Thread.sleep(500);
+    }
+
+    if (notFound) {
+      s = ReplicationTable.getScanner(conn);
+      for (Entry<Key,Value> content : s) {
+        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+      }
+      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+    }
+  }
+
+  @Test(timeout = 6 * 60 * 1000)
+  public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
+    Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
+    for (ProcessReference ref : gcProcs) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
+    }
+
+    final Connector conn = getConnector();
+
+    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+      conn.tableOperations().delete(ReplicationTable.NAME);
+    }
+
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    final Set<String> metadataWals = new HashSet<>();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+        // when that happens
+        while (keepRunning.get()) {
+          try {
+            metadataWals.addAll(getLogs(conn).keySet());
+          } catch (Exception e) {
+            log.error("Metadata table doesn't exist");
+          }
+        }
+      }
+
+    });
+
+    t.start();
+    
+    String table1 = "table1", table2 = "table2", table3 = "table3";
+
+    BatchWriter bw;
+    try {
+      conn.tableOperations().create(table1);
+      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+      conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+          ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
+  
+      // Write some data to table1
+      bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+  
+      bw.close();
+  
+      conn.tableOperations().create(table2);
+      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+  
+      // Write some data to table2
+      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+  
+      bw.close();
+  
+      conn.tableOperations().create(table3);
+      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+  
+      // Write some data to table3
+      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+  
+      bw.close();
+  
+      // Flush everything to try to make the replication records
+      for (String table : Arrays.asList(table1, table2, table3)) {
+        conn.tableOperations().compact(table, null, null, true, true);
+      }
+    } finally {
+      keepRunning.set(false);
+      t.join(5000);
+      Assert.assertFalse(t.isAlive());
+    }
+
+    // write a Long.MAX_VALUE into each repl entry
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
+    Set<String> filesToWatch = new HashSet<>();
+    Text buff = new Text();
+    for (Entry<Key,Value> entry : s) {
+      StatusSection.getFile(entry.getKey(), buff);
+      filesToWatch.add(buff.toString());
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertFalse(status.getClosed());
+
+      // Fake that each one is fully replicated
+      Mutation m = new Mutation(entry.getKey().getRow());
+      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    for (Entry<Key,Value> entry : s) {
+      Status status = Status.parseFrom(entry.getValue().get());
+      Assert.assertFalse(status.getClosed());
+
+      // Fake that each one is fully replicated
+      Mutation m = new Mutation(entry.getKey().getRow());
+      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
+          StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    // Kill the tserver(s) and restart them
+    // to ensure that the WALs we previously observed all move to closed.
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+
+    cluster.exec(TabletServer.class);
+
+    // Make sure we can read all the tables (recovery complete)
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      s = conn.createScanner(table, new Authorizations());
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : s) {}
+    }
+
+    // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
+    // in the metadata table, and then in the replication table
+    Process gc = cluster.exec(SimpleGarbageCollector.class);
+
+    try {
+      boolean allClosed = true;
+
+      // We should either find all closed records or no records
+      // After they're closed, they are candidates for deletion
+      for (int i = 0; i < 10; i++) {
+        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+        Iterator<Entry<Key,Value>> iter = s.iterator();
+
+        long recordsFound = 0l;
+        while (allClosed && iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          String wal = entry.getKey().getRow().toString();
+          if (metadataWals.contains(wal)) {
+            Status status = Status.parseFrom(entry.getValue().get());
+            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+            allClosed &= status.getClosed();
+            recordsFound++;
+          }
+        }
+
+        log.info("Found {} records from the metadata table", recordsFound);
+        if (allClosed) {
+          break;
+        }
+      }
+
+      if (!allClosed) {
+        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+        for (Entry<Key,Value> entry : s) {
+          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+        }
+        Assert.fail("Expected all replication records in the metadata table to be closed");
+      }
+
+      for (int i = 0; i < 10; i++) {
+        allClosed = true;
+
+        s = ReplicationTable.getScanner(conn);
+        Iterator<Entry<Key,Value>> iter = s.iterator();
+
+        long recordsFound = 0l;
+        while (allClosed && iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          String wal = entry.getKey().getRow().toString();
+          if (metadataWals.contains(wal)) {
+            Status status = Status.parseFrom(entry.getValue().get());
+            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+            allClosed &= status.getClosed();
+            recordsFound++;
+          }
+        }
+
+        log.info("Found {} records from the replication table", recordsFound);
+        if (allClosed) {
+          break;
+        }
+
+        UtilWaitThread.sleep(1000);
+      }
+
+      if (!allClosed) {
+        s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        for (Entry<Key,Value> entry : s) {
+          log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
+        }
+        Assert.fail("Expected all replication records in the replication table to be closed");
+      }
+
+    } finally {
+      gc.destroy();
+      gc.waitFor();
+    }
+
+  }
+
+  @Test(timeout = 5 * 60 * 1000)
+  public void replicatedStatusEntriesAreDeleted() throws Exception {
+    // Just stop it now, we'll restart it after we restart the tserver
+    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+
+    final Connector conn = getConnector();
+    log.info("Got connector to MAC");
+    String table1 = "table1";
+
+    // replication shouldn't exist when we begin
+    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+    thread.start();
+
+    try {
+      // Create two tables
+      conn.tableOperations().create(table1);
+
+      int attempts = 5;
+      while (attempts > 0) {
+        try {
+          // Enable replication on table1
+          conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+          // Replicate table1 to cluster1 in the table with id of '4'
+          conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+          // Use the MockReplicaSystem impl and sleep for 5seconds
+          conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
+          attempts = 0;
+        } catch (Exception e) {
+          attempts--;
+          if (attempts <= 0) {
+            throw e;
+          }
+          UtilWaitThread.sleep(500);
+        }
+      }
+
+      String tableId = conn.tableOperations().tableIdMap().get(table1);
+      Assert.assertNotNull("Could not determine table id for " + table1, tableId);
+
+      // Write some data to table1
+      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+      for (int rows = 0; rows < 2000; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 50; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      // Make sure the replication table exists at this point
+      boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+      attempts = 10;
+      do {
+        if (!exists) {
+          UtilWaitThread.sleep(1000);
+          exists = conn.tableOperations().exists(ReplicationTable.NAME);
+          attempts--;
+        }
+      } while (!exists && attempts > 0);
+      Assert.assertTrue("Replication table did not exist", exists);
+
+      // Grant ourselves the write permission for later
+      conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+      // Find the WorkSection record that will be created for that data we ingested
+      boolean notFound = true;
+      Scanner s;
+      for (int i = 0; i < 10 && notFound; i++) {
+        try {
+          s = ReplicationTable.getScanner(conn);
+          WorkSection.limit(s);
+          Entry<Key,Value> e = Iterables.getOnlyElement(s);
+          Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+          Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+          notFound = false;
+        } catch (NoSuchElementException e) {
+
+        } catch (IllegalArgumentException e) {
+          // Somehow we got more than one element. Log what they were
+          s = ReplicationTable.getScanner(conn);
+          for (Entry<Key,Value> content : s) {
+            log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+          }
+          Assert.fail("Found more than one work section entry");
+        } catch (RuntimeException e) {
+          // Catch a propagation issue, fail if it's not what we expect
+          Throwable cause = e.getCause();
+          if (cause instanceof AccumuloSecurityException) {
+            AccumuloSecurityException sec = (AccumuloSecurityException) cause;
+            switch (sec.getSecurityErrorCode()) {
+              case PERMISSION_DENIED:
+                // retry -- the grant didn't happen yet
+                log.warn("Sleeping because permission was denied");
+              default:
+                throw e;
+            }
+          } else {
+            throw e;
+          }
+        }
+
+        Thread.sleep(1000);
+      }
+
+      if (notFound) {
+        s = ReplicationTable.getScanner(conn);
+        for (Entry<Key,Value> content : s) {
+          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+        }
+        Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+      }
+
+      /**
+       * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
+       * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
+       * anymore
+       */
+
+      log.info("Killing tserver");
+      // Kill the tserver(s) and restart them
+      // to ensure that the WALs we previously observed all move to closed.
+      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+
+      log.info("Starting tserver");
+      cluster.exec(TabletServer.class);
+
+      log.info("Waiting to read tables");
+
+      // Make sure we can read all the tables (recovery complete)
+      for (String table : new String[] {MetadataTable.NAME, table1}) {
+        s = conn.createScanner(table, new Authorizations());
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : s) {}
+      }
+
+      log.info("Checking for replication entries in replication");
+      // Then we need to get those records over to the replication table
+      boolean foundResults = false;
+      for (int i = 0; i < 5; i++) {
+        s = ReplicationTable.getScanner(conn);
+        int count = 0;
+        for (Entry<Key,Value> entry : s) {
+          count++;
+          log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+        }
+        if (count > 0) {
+          foundResults = true;
+          break;
+        }
+        Thread.sleep(1000);
+      }
+
+      Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
+
+      getCluster().exec(SimpleGarbageCollector.class);
+
+      // Wait for a bit since the GC has to run (should be running after a one second delay)
+      Thread.sleep(5000);
+
+      // We expect no records in the metadata table after compaction. We have to poll
+      // because we have to wait for the StatusMaker's next iteration which will clean
+      // up the dangling *closed* records after we create the record in the replication table.
+      // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+      log.info("Checking metadata table for replication entries");
+      foundResults = true;
+      for (int i = 0; i < 5; i++) {
+        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        s.setRange(ReplicationSection.getRange());
+        long size = 0;
+        for (Entry<Key,Value> e : s) {
+          size++;
+          log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+        }
+        if (size == 0) {
+          foundResults = false;
+          break;
+        }
+        Thread.sleep(1000);
+        log.info("");
+      }
+
+      Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+
+      /**
+       * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
+       * deleted after replication occurs
+       */
+
+      int recordsFound = 0;
+      for (int i = 0; i < 10; i++) {
+        s = ReplicationTable.getScanner(conn);
+        recordsFound = 0;
+        for (Entry<Key,Value> entry : s) {
+          recordsFound++;
+          log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+        }
+
+        if (0 == recordsFound) {
+          break;
+        } else {
+          Thread.sleep(1000);
+          log.info("");
+        }
+      }
+
+      Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+    } finally {
+      thread.interrupt();
+      thread.join(5000);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
deleted file mode 100644
index 7d9c537..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.gc.SimpleGarbageCollector;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-import com.google.protobuf.TextFormat;
-
-/**
- * 
- */
-public class ReplicationWithGCIT extends ConfigurableMacIT {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationWithGCIT.class);
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    cfg.setProperty(Property.REPLICATION_NAME, "master");
-    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
-    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
-    cfg.setNumTservers(1);
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  private Set<String> metadataWals(Connector conn) throws Exception {
-    Scanner s = conn.createScanner(MetadataTable.NAME, new Authorizations());
-    s.fetchColumnFamily(LogColumnFamily.NAME);
-    Set<String> metadataWals = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-      for (String log : logEntry.logSet) {
-        metadataWals.add(new Path(log).toString());
-      }
-    }
-    return metadataWals;
-  }
-
-  @Test(timeout = 6 * 60 * 1000)
-  public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
-    Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
-    for (ProcessReference ref : gcProcs) {
-      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
-    }
-
-    final Connector conn = getConnector();
-
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    final Set<String> metadataWals = new HashSet<>();
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataWals.addAll(metadataWals(conn));
-          } catch (Exception e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
-    String table1 = "table1", table2 = "table2", table3 = "table3";
-
-    conn.tableOperations().create(table1);
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table2);
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
-    // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table3);
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
-    // Write some data to table3
-    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Flush everything to try to make the replication records
-    for (String table : Arrays.asList(table1, table2, table3)) {
-      conn.tableOperations().flush(table, null, null, true);
-    }
-
-    keepRunning.set(false);
-    t.join(5000);
-
-    // write a Long.MAX_VALUE into each repl entry
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
-    for (Entry<Key,Value> entry : s) {
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    for (Entry<Key,Value> entry : s) {
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
-          StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
-    // Kill the tserver(s) and restart them
-    // to ensure that the WALs we previously observed all move to closed.
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-      cluster.killProcess(ServerType.TABLET_SERVER, proc);
-    }
-
-    cluster.exec(TabletServer.class);
-
-    // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
-    // in the metadata table, and then in the replication table
-    Process gc = cluster.exec(SimpleGarbageCollector.class);
-
-    // Make sure we can read all the tables (recovery complete)
-    for (String table : Arrays.asList(table1, table2, table3)) {
-      s = conn.createScanner(table, new Authorizations());
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> entry : s) {}
-    }
-
-    try {
-      boolean allClosed = true;
-
-      // We should either find all closed records or no records
-      // After they're closed, they are candidates for deletion
-      for (int i = 0; i < 10; i++) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
-        Iterator<Entry<Key,Value>> iter = s.iterator();
-
-        long recordsFound = 0l;
-        while (allClosed && iter.hasNext()) {
-          Entry<Key,Value> entry = iter.next();
-          String wal = entry.getKey().getRow().toString();
-          if (metadataWals.contains(wal)) {
-            Status status = Status.parseFrom(entry.getValue().get());
-            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
-            allClosed &= status.getClosed();
-            recordsFound++;
-          }
-        }
-
-        log.info("Found {} records from the metadata table", recordsFound);
-        if (allClosed) {
-          break;
-        }
-      }
-
-      if (!allClosed) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
-        for (Entry<Key,Value> entry : s) {
-          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
-        }
-        Assert.fail("Expected all replication records in the metadata table to be closed");
-      }
-
-      for (int i = 0; i < 10; i++) {
-        allClosed = true;
-
-        s = ReplicationTable.getScanner(conn);
-        Iterator<Entry<Key,Value>> iter = s.iterator();
-
-        long recordsFound = 0l;
-        while (allClosed && iter.hasNext()) {
-          Entry<Key,Value> entry = iter.next();
-          String wal = entry.getKey().getRow().toString();
-          if (metadataWals.contains(wal)) {
-            Status status = Status.parseFrom(entry.getValue().get());
-            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
-            allClosed &= status.getClosed();
-            recordsFound++;
-          }
-        }
-
-        log.info("Found {} records from the replication table", recordsFound);
-        if (allClosed) {
-          break;
-        }
-
-        UtilWaitThread.sleep(1000);
-      }
-
-      if (!allClosed) {
-        s = ReplicationTable.getScanner(conn);
-        StatusSection.limit(s);
-        for (Entry<Key,Value> entry : s) {
-          log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
-        }
-        Assert.fail("Expected all replication records in the replication table to be closed");
-      }
-
-    } finally {
-      gc.destroy();
-      gc.waitFor();
-    }
-
-  }
-
-  @Test(timeout = 5 * 60 * 1000)
-  public void replicatedStatusEntriesAreDeleted() throws Exception {
-    // Just stop it now, we'll restart it after we restart the tserver
-    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
-      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
-    }
-
-    final Connector conn = getConnector();
-    log.info("Got connector to MAC");
-    String table1 = "table1";
-
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
-    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
-    thread.start();
-
-    try {
-      // Create two tables
-      conn.tableOperations().create(table1);
-
-      int attempts = 5;
-      while (attempts > 0) {
-        try {
-          // Enable replication on table1
-          conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-          // Replicate table1 to cluster1 in the table with id of '4'
-          conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
-          // Use the MockReplicaSystem impl and sleep for 5seconds
-          conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
-          attempts = 0;
-        } catch (Exception e) {
-          attempts--;
-          if (attempts <= 0) {
-            throw e;
-          }
-          UtilWaitThread.sleep(500);
-        }
-      }
-
-      String tableId = conn.tableOperations().tableIdMap().get(table1);
-      Assert.assertNotNull("Could not determine table id for " + table1, tableId);
-
-      // Write some data to table1
-      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 2000; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 50; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
-
-      // Make sure the replication table exists at this point
-      boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
-      attempts = 10;
-      do {
-        if (!exists) {
-          UtilWaitThread.sleep(1000);
-          exists = conn.tableOperations().exists(ReplicationTable.NAME);
-          attempts--;
-        }
-      } while (!exists && attempts > 0);
-      Assert.assertTrue("Replication table did not exist", exists);
-
-      // Grant ourselves the write permission for later
-      conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-      // Find the WorkSection record that will be created for that data we ingested
-      boolean notFound = true;
-      Scanner s;
-      for (int i = 0; i < 10 && notFound; i++) {
-        try {
-          s = ReplicationTable.getScanner(conn);
-          WorkSection.limit(s);
-          Entry<Key,Value> e = Iterables.getOnlyElement(s);
-          Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
-          Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
-          notFound = false;
-        } catch (NoSuchElementException e) {
-
-        } catch (IllegalArgumentException e) {
-          // Somehow we got more than one element. Log what they were
-          s = ReplicationTable.getScanner(conn);
-          for (Entry<Key,Value> content : s) {
-            log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-          }
-          Assert.fail("Found more than one work section entry");
-        } catch (RuntimeException e) {
-          // Catch a propagation issue, fail if it's not what we expect
-          Throwable cause = e.getCause();
-          if (cause instanceof AccumuloSecurityException) {
-            AccumuloSecurityException sec = (AccumuloSecurityException) cause;
-            switch (sec.getSecurityErrorCode()) {
-              case PERMISSION_DENIED:
-                // retry -- the grant didn't happen yet
-                log.warn("Sleeping because permission was denied");
-              default:
-                throw e;
-            }
-          } else {
-            throw e;
-          }
-        }
-
-        Thread.sleep(1000);
-      }
-
-      if (notFound) {
-        s = ReplicationTable.getScanner(conn);
-        for (Entry<Key,Value> content : s) {
-          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-        }
-        Assert.assertFalse("Did not find the work entry for the status entry", notFound);
-      }
-
-      /**
-       * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
-       * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
-       * anymore
-       */
-
-      log.info("Killing tserver");
-      // Kill the tserver(s) and restart them
-      // to ensure that the WALs we previously observed all move to closed.
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
-      }
-
-      log.info("Starting tserver");
-      cluster.exec(TabletServer.class);
-
-      log.info("Waiting to read tables");
-
-      // Make sure we can read all the tables (recovery complete)
-      for (String table : new String[] {MetadataTable.NAME, table1}) {
-        s = conn.createScanner(table, new Authorizations());
-        for (@SuppressWarnings("unused")
-        Entry<Key,Value> entry : s) {}
-      }
-
-      log.info("Checking for replication entries in replication");
-      // Then we need to get those records over to the replication table
-      boolean foundResults = false;
-      for (int i = 0; i < 5; i++) {
-        s = ReplicationTable.getScanner(conn);
-        int count = 0;
-        for (Entry<Key,Value> entry : s) {
-          count++;
-          log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-        }
-        if (count > 0) {
-          foundResults = true;
-          break;
-        }
-        Thread.sleep(1000);
-      }
-
-      Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
-      getCluster().exec(SimpleGarbageCollector.class);
-
-      // Wait for a bit since the GC has to run (should be running after a one second delay)
-      Thread.sleep(5000);
-
-      // We expect no records in the metadata table after compaction. We have to poll
-      // because we have to wait for the StatusMaker's next iteration which will clean
-      // up the dangling *closed* records after we create the record in the replication table.
-      // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
-      log.info("Checking metadata table for replication entries");
-      foundResults = true;
-      for (int i = 0; i < 5; i++) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-        s.setRange(ReplicationSection.getRange());
-        long size = 0;
-        for (Entry<Key,Value> e : s) {
-          size++;
-          log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
-        }
-        if (size == 0) {
-          foundResults = false;
-          break;
-        }
-        Thread.sleep(1000);
-        log.info("");
-      }
-
-      Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
-
-      /**
-       * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
-       * deleted after repliation occurs
-       */
-
-      int recordsFound = 0;
-      for (int i = 0; i < 10; i++) {
-        s = ReplicationTable.getScanner(conn);
-        recordsFound = 0;
-        for (Entry<Key,Value> entry : s) {
-          recordsFound++;
-          log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
-        }
-
-        if (0 == recordsFound) {
-          break;
-        } else {
-          Thread.sleep(1000);
-          log.info("");
-        }
-      }
-
-      Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
-    } finally {
-      thread.join(200);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
deleted file mode 100644
index 70d6ca1..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- * Tests for replication that should be run at every build -- basic functionality
- */
-public class ReplicationWithMakerTest extends ConfigurableMacIT {
-
-  @Override
-  public int defaultTimeoutSeconds() {
-    return 30;
-  }
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    // Run the process in the master which writes replication records from metadata to replication
-    // repeatedly without pause
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
-    cfg.setNumTservers(1);
-  }
-
-  @Test
-  public void singleTableSingleTarget() throws Exception {
-    // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
-    // against expected Status messages.
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
-      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
-    }
-
-    Connector conn = getConnector();
-    String table1 = "table1";
-
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
-    // Create a table
-    conn.tableOperations().create(table1);
-
-    int attempts = 5;
-    while (attempts > 0) {
-      try {
-        // Enable replication on table1
-        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-        // Replicate table1 to cluster1 in the table with id of '4'
-        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
-        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
-        break;
-      } catch (Exception e) {
-        attempts--;
-        if (attempts <= 0) {
-          throw e;
-        }
-        UtilWaitThread.sleep(500);
-      }
-    }
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Make sure the replication table exists at this point
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
-    attempts = 5;
-    do {
-      if (!exists) {
-        UtilWaitThread.sleep(200);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
-        attempts--;
-      }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table was never created", exists);
-
-    // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
-    for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
-      UtilWaitThread.sleep(1000);
-    }
-
-    Assert.assertTrue("Combiner was never set on replication table",
-        conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
-
-    // Trigger the minor compaction, waiting for it to finish.
-    // This should write the entry to metadata that the file has data
-    conn.tableOperations().flush(table1, null, null, true);
-
-    // Make sure that we have one status element, should be a new file
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Entry<Key,Value> entry = null;
-    Status expectedStatus = StatusUtil.openWithUnknownLength();
-    attempts = 5;
-    // This record will move from new to new with infinite length because of the minc (flush)
-    while (null == entry && attempts > 0) {
-      try {
-        entry = Iterables.getOnlyElement(s);
-        Status actual = Status.parseFrom(entry.getValue().get());
-        if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
-          entry = null;
-          // the master process didn't yet fire and write the new mutation, wait for it to do
-          // so and try to read it again
-          Thread.sleep(1000);
-        }
-      } catch (NoSuchElementException e) {
-        entry = null;
-        Thread.sleep(500);
-      } catch (IllegalArgumentException e) {
-        // saw this contain 2 elements once
-        s = ReplicationTable.getScanner(conn);
-        StatusSection.limit(s);
-        for (Entry<Key,Value> content : s) {
-          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-        }
-        throw e;
-      } finally {
-        attempts--;
-      }
-    }
-
-    Assert.assertNotNull("Could not find expected entry in replication table", entry);
-    Status actual = Status.parseFrom(entry.getValue().get());
-    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
-
-    // Try a couple of times to watch for the work record to be created
-    boolean notFound = true;
-    for (int i = 0; i < 10 && notFound; i++) {
-      s = ReplicationTable.getScanner(conn);
-      WorkSection.limit(s);
-      int elementsFound = Iterables.size(s);
-      if (0 < elementsFound) {
-        Assert.assertEquals(1, elementsFound);
-        notFound = false;
-      }
-      Thread.sleep(500);
-    }
-
-    // If we didn't find the work record, print the contents of the table
-    if (notFound) {
-      s = ReplicationTable.getScanner(conn);
-      for (Entry<Key,Value> content : s) {
-        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-      }
-      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
-    }
-
-    // Write some more data so that we over-run the single WAL
-    bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().compact(ReplicationTable.NAME, null, null, true, true);
-
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Assert.assertEquals(2, Iterables.size(s));
-
-    // We should eventually get 2 work records recorded, need to account for a potential delay though
-    // might see: status1 -> work1 -> status2 -> (our scans) -> work2
-    notFound = true;
-    for (int i = 0; i < 10 && notFound; i++) {
-      s = ReplicationTable.getScanner(conn);
-      WorkSection.limit(s);
-      int elementsFound = Iterables.size(s);
-      if (2 == elementsFound) {
-        notFound = false;
-      }
-      Thread.sleep(500);
-    }
-
-    // If we didn't find the work record, print the contents of the table
-    if (notFound) {
-      s = ReplicationTable.getScanner(conn);
-      for (Entry<Key,Value> content : s) {
-        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-      }
-      Assert.assertFalse("Did not find the work entries for the status entries", notFound);
-    }
-  }
-
-  @Test
-  public void correctClusterNameInWorkEntry() throws Exception {
-    Connector conn = getConnector();
-    String table1 = "table1";
-
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
-    // Create two tables
-    conn.tableOperations().create(table1);
-
-    int attempts = 5;
-    while (attempts > 0) {
-      try {
-        // Enable replication on table1
-        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-        // Replicate table1 to cluster1 in the table with id of '4'
-        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
-        attempts = 0;
-      } catch (Exception e) {
-        attempts--;
-        if (attempts <= 0) {
-          throw e;
-        }
-        UtilWaitThread.sleep(500);
-      }
-    }
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    String tableId = conn.tableOperations().tableIdMap().get(table1);
-    Assert.assertNotNull("Table ID was null", tableId);
-
-    // Make sure the replication table exists at this point
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
-    attempts = 5;
-    do {
-      if (!exists) {
-        UtilWaitThread.sleep(500);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
-        attempts--;
-      }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
-
-    for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
-      Thread.sleep(1000);
-    }
-
-    Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
-
-    boolean notFound = true;
-    Scanner s;
-    for (int i = 0; i < 10 && notFound; i++) {
-      s = ReplicationTable.getScanner(conn);
-      WorkSection.limit(s);
-      try {
-        Entry<Key,Value> e = Iterables.getOnlyElement(s);
-        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
-        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
-        notFound = false;
-      } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
-        s = ReplicationTable.getScanner(conn);
-        for (Entry<Key,Value> content : s) {
-          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-        }
-        Assert.fail("Found more than one work section entry");
-      }
-
-      Thread.sleep(500);
-    }
-
-    if (notFound) {
-      s = ReplicationTable.getScanner(conn);
-      for (Entry<Key,Value> content : s) {
-        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
-      }
-      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
-    }
-  }
-}


Mime
View raw message