accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [6/6] git commit: ACCUMULO-378 Test class consolidation
Date Tue, 27 May 2014 04:04:22 GMT
ACCUMULO-378 Test class consolidation


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d9b5ed2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d9b5ed2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d9b5ed2

Branch: refs/heads/ACCUMULO-378
Commit: 9d9b5ed24f3e425459108a993ab2cea121d1b612
Parents: 3b727cf
Author: Josh Elser <elserj@apache.org>
Authored: Mon May 26 13:48:55 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Mon May 26 13:48:55 2014 -0400

----------------------------------------------------------------------
 .../replication/SequentialWorkAssigner.java     |    2 +-
 .../replication/ReplicationProcessor.java       |    4 +
 .../replication/MultiTserverReplicationIT.java  |  113 ++
 .../replication/ReplicationDeadlockTest.java    |  170 ---
 .../ReplicationFilesClosedAfterUnusedTest.java  |  172 ---
 .../test/replication/ReplicationIT.java         |  338 +++++-
 .../ReplicationPortAdvertisementIT.java         |  113 --
 .../replication/ReplicationSequentialIT.java    |  402 -------
 .../replication/ReplicationSourceOnlyIT.java    |  208 ----
 .../replication/ReplicationTablesMacTest.java   |   90 --
 .../test/replication/ReplicationTest.java       | 1135 +++++++++++++++++-
 .../test/replication/ReplicationWithGCIT.java   |  554 ---------
 .../replication/ReplicationWithMakerTest.java   |  337 ------
 13 files changed, 1588 insertions(+), 2050 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f2d110a..af43d7d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -297,7 +297,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
             log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
           }
         } else {
-          log.debug("Not queueing work for {} because {} doesn't need replication", file, ProtobufUtil.toString(status));
+          log.debug("Not queueing work for {} to {} because {} doesn't need replication", file, target, ProtobufUtil.toString(status));
           if (key.equals(keyBeingReplicated)) {
             log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName());
             queuedWorkForPeer.remove(sourceTableId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index f6fe91f..50c79d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.tserver.replication;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -96,6 +97,9 @@ public class ReplicationProcessor implements Processor {
     } catch (InvalidProtocolBufferException e) {
       log.error("Could not deserialize Status from Work section for {} and ", file, target);
       throw new RuntimeException("Could not parse Status for work record", e);
+    } catch (NoSuchElementException e) {
+      log.error("Assigned work for {} to {} but could not find work record", file, target);
+      return;
     }
 
     log.debug("Current status for {} replicating to {}: {}", file, target, ProtobufUtil.toString(status));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
new file mode 100644
index 0000000..96e8b52
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+
+/**
+ * 
+ */
+public class MultiTserverReplicationIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(MultiTserverReplicationIT.class);
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(2);
+  }
+
+  @Test
+  public void tserverReplicationServicePortsAreAdvertised() throws Exception {
+    // Wait for the cluster to be up
+    Connector conn = getConnector();
+    Instance inst = conn.getInstance();
+
+    // Wait for a tserver to come up to fulfill this request
+    conn.tableOperations().create("foo");
+    Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+    Set<String> tserverHost = new HashSet<>();
+    tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
+
+    Set<HostAndPort> replicationServices = new HashSet<>();
+
+    for (String tserver : tserverHost) {
+      try {
+        byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
+        HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
+        replicationServices.add(replAddress);
+      } catch (Exception e) {
+        log.error("Could not find port for {}", tserver, e);
+        Assert.fail("Did not find replication port advertisement for " + tserver);
+      }
+    }
+
+    // Each tserver should also have equial replicaiton services running internally
+    Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
+  }
+
+  @Test
+  public void masterReplicationServicePortsAreAdvertised() throws Exception {
+    // Wait for the cluster to be up
+    Connector conn = getConnector();
+    Instance inst = conn.getInstance();
+
+    // Wait for a tserver to come up to fulfill this request
+    conn.tableOperations().create("foo");
+    Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+    // Should have one master instance
+    Assert.assertEquals(1, inst.getMasterLocations().size());
+
+    // Get the master thrift service addr
+    String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
+
+    // Get the master replication coordinator addr
+    String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
+
+    // They shouldn't be the same
+    Assert.assertNotEquals(masterAddr, replCoordAddr);
+
+    // Neither should be zero as the port
+    Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
+    Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
deleted file mode 100644
index 418d717..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Test;
-
-/**
- * 
- */
-public class ReplicationDeadlockTest extends ConfigurableMacIT {
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  private Set<String> metadataWals(Connector conn) throws Exception {
-    Scanner s = conn.createScanner(MetadataTable.NAME, new Authorizations());
-    s.fetchColumnFamily(LogColumnFamily.NAME);
-    Set<String> metadataWals = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-      for (String log : logEntry.logSet) {
-        metadataWals.add(new Path(log).toString());
-      }
-    }
-    return metadataWals;
-  }
-
-  @Test(timeout = 60 * 1000)
-  public void noDeadlock() throws Exception {
-    final Connector conn = getConnector();
-
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    ReplicationTable.create(conn);
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    final Set<String> metadataWals = new HashSet<>();
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataWals.addAll(metadataWals(conn));
-          } catch (Exception e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
-    String table1 = "table1", table2 = "table2", table3 = "table3";
-
-    conn.tableOperations().create(table1);
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    conn.tableOperations().create(table2);
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    conn.tableOperations().create(table3);
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Write some data to table3
-    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Flush everything to try to make the replication records
-    for (String table : Arrays.asList(table1, table2, table3)) {
-      conn.tableOperations().flush(table, null, null, true);
-    }
-
-    keepRunning.set(false);
-    t.join(5000);
-
-    for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
-      Scanner s = conn.createScanner(table, new Authorizations());
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> entry : s) {}
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
deleted file mode 100644
index eb89317..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- */
-public class ReplicationFilesClosedAfterUnusedTest extends ConfigurableMacIT {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationFilesClosedAfterUnusedTest.class);
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
-    cfg.setProperty(Property.REPLICATION_NAME, "master");
-    cfg.setNumTservers(1);
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  @Test(timeout = 60000)
-  public void test() throws Exception {
-    Connector conn = getConnector();
-
-    String table = "table";
-    conn.tableOperations().create(table);
-    String tableId = conn.tableOperations().tableIdMap().get(table);
-
-    Assert.assertNotNull(tableId);
-
-    log.info("Writing to {}", tableId);
-
-    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    // just sleep
-    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
-
-    // Write a mutation to make a log file
-    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-    Mutation m = new Mutation("one");
-    m.put("", "", "");
-    bw.addMutation(m);
-    bw.close();
-
-    // Write another to make sure the logger rolls itself?
-    bw = conn.createBatchWriter(table, new BatchWriterConfig());
-    m = new Mutation("three");
-    m.put("", "", "");
-    bw.addMutation(m);
-    bw.close();
-
-    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
-    s.setRange(TabletsSection.getRange(tableId));
-    Set<String> wals = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-      for (String file : logEntry.logSet) {
-        Path p = new Path(file);
-        wals.add(p.toString());
-      }
-    }
-
-    log.warn("Found wals {}", wals);
-
-    // for (int j = 0; j < 5; j++) {
-    bw = conn.createBatchWriter(table, new BatchWriterConfig());
-    m = new Mutation("three");
-    byte[] bytes = new byte[1024 * 1024];
-    m.put("1".getBytes(), new byte[0], bytes);
-    m.put("2".getBytes(), new byte[0], bytes);
-    m.put("3".getBytes(), new byte[0], bytes);
-    m.put("4".getBytes(), new byte[0], bytes);
-    m.put("5".getBytes(), new byte[0], bytes);
-    bw.addMutation(m);
-    bw.close();
-
-    conn.tableOperations().flush(table, null, null, true);
-
-    while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
-      UtilWaitThread.sleep(500);
-    }
-
-    for (int i = 0; i < 5; i++) {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.fetchColumnFamily(LogColumnFamily.NAME);
-      s.setRange(TabletsSection.getRange(tableId));
-      for (Entry<Key,Value> entry : s) {
-        log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
-      }
-
-      s = ReplicationTable.getScanner(conn);
-      StatusSection.limit(s);
-      Text buff = new Text();
-      boolean allReferencedLogsClosed = true;
-      int recordsFound = 0;
-      for (Entry<Key,Value> e : s) {
-        recordsFound++;
-        allReferencedLogsClosed = true;
-        StatusSection.getFile(e.getKey(), buff);
-        String file = buff.toString();
-        if (wals.contains(file)) {
-          Status stat = Status.parseFrom(e.getValue().get());
-          if (!stat.getClosed()) {
-            log.info("{} wasn't closed", file);
-            allReferencedLogsClosed = false;
-          }
-        }
-      }
-
-      if (recordsFound > 0 && allReferencedLogsClosed) {
-        return;
-      }
-
-      Thread.sleep(1000);
-    }
-
-    Assert.fail("We had a file that was referenced but didn't get closed");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index f34b626..db21586 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -19,6 +19,12 @@ package org.apache.accumulo.test.replication;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -31,13 +37,16 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
@@ -48,7 +57,9 @@ import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +67,20 @@ import org.slf4j.LoggerFactory;
 public class ReplicationIT extends ConfigurableMacIT {
   private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
 
+  private ExecutorService executor;
+
+  @Before
+  public void createExecutor() {
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @After
+  public void stopExecutor() {
+    if (null != executor) {
+      executor.shutdownNow();
+    }
+  }
+
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setNumTservers(1);
@@ -66,6 +91,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
     cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
     cfg.setProperty(Property.REPLICATION_NAME, "master");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
@@ -79,6 +105,314 @@ public class ReplicationIT extends ConfigurableMacIT {
     peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
     peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
     peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+    peerCluster.start();
+
+    try {
+      final Connector connMaster = getConnector();
+      final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+  
+      ReplicationTable.create(connMaster);
+
+      String peerUserName = "peer", peerPassword = "foo";
+  
+      String peerClusterName = "peer";
+
+      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+      
+      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+  
+      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+      connMaster.instanceOperations().setProperty(
+          Property.REPLICATION_PEERS.getKey() + peerClusterName,
+          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+              AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+  
+      final String masterTable = "master", peerTable = "peer";
+  
+      connMaster.tableOperations().create(masterTable);
+      String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+      Assert.assertNotNull(masterTableId);
+  
+      connPeer.tableOperations().create(peerTable);
+      String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+      Assert.assertNotNull(peerTableId);
+
+      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+  
+      // Replicate this table to the peerClusterName in a table with the peerTableId table id
+      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+  
+      // Write some data to table1
+      BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+      for (int rows = 0; rows < 5000; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 100; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+  
+      bw.close();
+  
+      log.info("Wrote all data to master cluster");
+  
+//      log.debug("");
+//      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+//        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+//          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+//        } else {
+//          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+//        }
+//      }
+  
+      final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+  
+      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+      cluster.exec(TabletServer.class);
+  
+      log.info("TabletServer restarted");
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
+      log.info("TabletServer is online");
+  
+      log.info("");
+      log.info("Fetching metadata records:");
+      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+        } else {
+          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+        }
+      }
+  
+      log.info("");
+      log.info("Fetching replication records:");
+      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+      }
+  
+      Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+  
+        @Override
+        public Boolean call() throws Exception {
+          connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
+          log.info("Drain completed");
+          return true;
+        }
+  
+      });
+  
+      try {
+        future.get(30, TimeUnit.SECONDS);
+      } catch (TimeoutException e) {
+        future.cancel(true);
+        Assert.fail("Drain did not finish within 30 seconds");
+      }
+  
+      log.info("drain completed");
+  
+      log.info("");
+      log.info("Fetching metadata records:");
+      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+        } else {
+          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+        }
+      }
+  
+      log.info("");
+      log.info("Fetching replication records:");
+      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+      }
+  
+      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+      Entry<Key,Value> masterEntry = null, peerEntry = null;
+      while (masterIter.hasNext() && peerIter.hasNext()) {
+        masterEntry = masterIter.next();
+        peerEntry = peerIter.next();
+        Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+      }
+  
+      log.info("Last master entry: " + masterEntry);
+      log.info("Last peer entry: " + peerEntry);
+  
+      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+    } finally {
+      peerCluster.stop();
+    }
+  }
+
+  @Test(timeout = 60 * 5000)
+  public void dataReplicatedToCorrectTable() throws Exception {
+    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
+
+    peer1Cluster.start();
+
+    try {
+      Connector connMaster = getConnector();
+      Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+
+      String peerClusterName = "peer";
+      String peerUserName = "peer", peerPassword = "foo";
+
+      // Create local user
+      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+      connMaster.instanceOperations().setProperty(
+          Property.REPLICATION_PEERS.getKey() + peerClusterName,
+          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+      // Create tables
+      connMaster.tableOperations().create(masterTable1);
+      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+      Assert.assertNotNull(masterTableId1);
+
+      connMaster.tableOperations().create(masterTable2);
+      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+      Assert.assertNotNull(masterTableId2);
+
+      connPeer.tableOperations().create(peerTable1);
+      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+      Assert.assertNotNull(peerTableId1);
+
+      connPeer.tableOperations().create(peerTable2);
+      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+      Assert.assertNotNull(peerTableId2);
+
+      // Grant write permission
+      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
+      // Replicate this table to the peerClusterName in a table with the peerTableId table id
+      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
+
+      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
+
+      // Write some data to table1
+      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+      long masterTable1Records = 0l;
+      for (int rows = 0; rows < 2500; rows++) {
+        Mutation m = new Mutation(masterTable1 + rows);
+        for (int cols = 0; cols < 100; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+          masterTable1Records++;
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      // Write some data to table2
+      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+      long masterTable2Records = 0l;
+      for (int rows = 0; rows < 2500; rows++) {
+        Mutation m = new Mutation(masterTable2 + rows);
+        for (int cols = 0; cols < 100; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+          masterTable2Records++;
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      log.info("Wrote all data to master cluster");
+
+      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+          masterTable2);
+
+      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
+        Thread.sleep(500);
+      }
+
+      // Restart the tserver to force a close on the WAL
+      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+      cluster.exec(TabletServer.class);
+
+      log.info("Restarted the tserver");
+
+      // Read the data -- the tserver is back up and running
+      for (@SuppressWarnings("unused")
+      Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+
+      // Wait for both tables to be replicated
+      log.info("Waiting for {} for {}", filesFor1, masterTable1);
+      connMaster.replicationOperations().drain(masterTable1, filesFor1);
+
+      log.info("Waiting for {} for {}", filesFor2, masterTable2);
+      connMaster.replicationOperations().drain(masterTable2, filesFor2);
+
+      long countTable = 0l;
+      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+        countTable++;
+        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+            .startsWith(masterTable1));
+      }
+
+      log.info("Found {} records in {}", countTable, peerTable1);
+      Assert.assertEquals(masterTable1Records, countTable);
+
+      countTable = 0l;
+      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+        countTable++;
+        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+            .startsWith(masterTable2));
+      }
+
+      log.info("Found {} records in {}", countTable, peerTable2);
+      Assert.assertEquals(masterTable2Records, countTable);
+
+    } finally {
+      peer1Cluster.stop();
+    }
+  }
+
+  @Test(timeout = 60 * 5000)
+  public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
+    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
     MiniAccumuloClusterImpl peerCluster = peerCfg.build();
 
     peerCluster.start();
@@ -169,7 +503,7 @@ public class ReplicationIT extends ConfigurableMacIT {
   }
 
   @Test(timeout = 60 * 5000)
-  public void dataReplicatedToCorrectTable() throws Exception {
+  public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
         ROOT_PASSWORD);
     peerCfg.setNumTservers(1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
deleted file mode 100644
index 0afbc05..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
-
-/**
- * 
- */
-public class ReplicationPortAdvertisementIT extends ConfigurableMacIT {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationPortAdvertisementIT.class);
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(2);
-  }
-
-  @Test
-  public void tserverReplicationServicePortsAreAdvertised() throws Exception {
-    // Wait for the cluster to be up
-    Connector conn = getConnector();
-    Instance inst = conn.getInstance();
-
-    // Wait for a tserver to come up to fulfill this request
-    conn.tableOperations().create("foo");
-    Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
-    Assert.assertEquals(0, Iterables.size(s));
-
-    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
-    Set<String> tserverHost = new HashSet<>();
-    tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
-
-    Set<HostAndPort> replicationServices = new HashSet<>();
-
-    for (String tserver : tserverHost) {
-      try {
-        byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
-        HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
-        replicationServices.add(replAddress);
-      } catch (Exception e) {
-        log.error("Could not find port for {}", tserver, e);
-        Assert.fail("Did not find replication port advertisement for " + tserver);
-      }
-    }
-
-    // Each tserver should also have equial replicaiton services running internally
-    Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
-  }
-
-  @Test
-  public void masterReplicationServicePortsAreAdvertised() throws Exception {
-    // Wait for the cluster to be up
-    Connector conn = getConnector();
-    Instance inst = conn.getInstance();
-
-    // Wait for a tserver to come up to fulfill this request
-    conn.tableOperations().create("foo");
-    Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
-    Assert.assertEquals(0, Iterables.size(s));
-
-    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
-
-    // Should have one master instance
-    Assert.assertEquals(1, inst.getMasterLocations().size());
-
-    // Get the master thrift service addr
-    String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
-
-    // Get the master replication coordinator addr
-    String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
-
-    // They shouldn't be the same
-    Assert.assertNotEquals(masterAddr, replCoordAddr);
-
-    // Neither should be zero as the port
-    Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
-    Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
deleted file mode 100644
index c7c36e8..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.master.replication.SequentialWorkAssigner;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReplicationSequentialIT extends ConfigurableMacIT {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationSequentialIT.class);
-
-  private ExecutorService executor;
-
-  @Before
-  public void createExecutor() {
-    executor = Executors.newSingleThreadExecutor();
-  }
-
-  @After
-  public void stopExecutor() {
-    if (null != executor) {
-      executor.shutdownNow();
-    }
-  }
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
-    cfg.setProperty(Property.REPLICATION_NAME, "master");
-    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  @Test(timeout = 60 * 5000)
-  public void dataWasReplicatedToThePeer() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
-
-    peerCluster.start();
-
-    try {
-      final Connector connMaster = getConnector();
-      final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-  
-      ReplicationTable.create(connMaster);
-
-      String peerUserName = "peer", peerPassword = "foo";
-  
-      String peerClusterName = "peer";
-
-      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-      
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-  
-      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-      connMaster.instanceOperations().setProperty(
-          Property.REPLICATION_PEERS.getKey() + peerClusterName,
-          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-              AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-  
-      final String masterTable = "master", peerTable = "peer";
-  
-      connMaster.tableOperations().create(masterTable);
-      String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
-      Assert.assertNotNull(masterTableId);
-  
-      connPeer.tableOperations().create(peerTable);
-      String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
-      Assert.assertNotNull(peerTableId);
-
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-  
-      // Replicate this table to the peerClusterName in a table with the peerTableId table id
-      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
-  
-      // Write some data to table1
-      BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-      for (int rows = 0; rows < 5000; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-  
-      bw.close();
-  
-      log.info("Wrote all data to master cluster");
-  
-      log.debug("");
-      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-        } else {
-          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-        }
-      }
-  
-      final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-  
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
-      }
-      cluster.exec(TabletServer.class);
-  
-      log.info("TabletServer restarted");
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
-      log.info("TabletServer is online");
-  
-      log.info("");
-      log.info("Fetching metadata records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-        } else {
-          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-        }
-      }
-  
-      log.info("");
-      log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
-        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-      }
-  
-      Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-  
-        @Override
-        public Boolean call() throws Exception {
-          connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
-          log.info("Drain completed");
-          return true;
-        }
-  
-      });
-  
-      try {
-        future.get(30, TimeUnit.SECONDS);
-      } catch (TimeoutException e) {
-        future.cancel(true);
-        Assert.fail("Drain did not finish within 30 seconds");
-      }
-  
-      log.info("drain completed");
-  
-      log.info("");
-      log.info("Fetching metadata records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
-          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-        } else {
-          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
-        }
-      }
-  
-      log.info("");
-      log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
-        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
-      }
-  
-      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
-      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
-      Entry<Key,Value> masterEntry = null, peerEntry = null;
-      while (masterIter.hasNext() && peerIter.hasNext()) {
-        masterEntry = masterIter.next();
-        peerEntry = peerIter.next();
-        Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
-            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
-        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
-      }
-  
-      log.info("Last master entry: " + masterEntry);
-      log.info("Last peer entry: " + peerEntry);
-  
-      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
-      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
-    } finally {
-      peerCluster.stop();
-    }
-  }
-
-  @Test(timeout = 60 * 5000)
-  public void dataReplicatedToCorrectTable() throws Exception {
-    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
-        ROOT_PASSWORD);
-    peerCfg.setNumTservers(1);
-    peerCfg.setInstanceName("peer");
-    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
-    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
-    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
-    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
-
-    peer1Cluster.start();
-
-    try {
-      Connector connMaster = getConnector();
-      Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
-
-      String peerClusterName = "peer";
-      String peerUserName = "peer", peerPassword = "foo";
-
-      // Create local user
-      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
-      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
-      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
-      connMaster.instanceOperations().setProperty(
-          Property.REPLICATION_PEERS.getKey() + peerClusterName,
-          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
-              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
-
-      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
-
-      // Create tables
-      connMaster.tableOperations().create(masterTable1);
-      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
-      Assert.assertNotNull(masterTableId1);
-
-      connMaster.tableOperations().create(masterTable2);
-      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
-      Assert.assertNotNull(masterTableId2);
-
-      connPeer.tableOperations().create(peerTable1);
-      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
-      Assert.assertNotNull(peerTableId1);
-
-      connPeer.tableOperations().create(peerTable2);
-      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
-      Assert.assertNotNull(peerTableId2);
-
-      // Grant write permission
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
-      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
-
-      // Replicate this table to the peerClusterName in a table with the peerTableId table id
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
-
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
-      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
-
-      // Write some data to table1
-      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
-      long masterTable1Records = 0l;
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable1 + rows);
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-          masterTable1Records++;
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
-
-      // Write some data to table2
-      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
-      long masterTable2Records = 0l;
-      for (int rows = 0; rows < 2500; rows++) {
-        Mutation m = new Mutation(masterTable2 + rows);
-        for (int cols = 0; cols < 100; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-          masterTable2Records++;
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
-
-      log.info("Wrote all data to master cluster");
-
-      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
-          masterTable2);
-
-      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
-        Thread.sleep(500);
-      }
-
-      // Restart the tserver to force a close on the WAL
-      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-        cluster.killProcess(ServerType.TABLET_SERVER, proc);
-      }
-      cluster.exec(TabletServer.class);
-
-      log.info("Restarted the tserver");
-
-      // Read the data -- the tserver is back up and running
-      for (@SuppressWarnings("unused")
-      Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
-
-      // Wait for both tables to be replicated
-      log.info("Waiting for {} for {}", filesFor1, masterTable1);
-      connMaster.replicationOperations().drain(masterTable1, filesFor1);
-
-      log.info("Waiting for {} for {}", filesFor2, masterTable2);
-      connMaster.replicationOperations().drain(masterTable2, filesFor2);
-
-      long countTable = 0l;
-      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
-        countTable++;
-        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-            .startsWith(masterTable1));
-      }
-
-      log.info("Found {} records in {}", countTable, peerTable1);
-      Assert.assertEquals(masterTable1Records, countTable);
-
-      countTable = 0l;
-      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
-        countTable++;
-        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
-            .startsWith(masterTable2));
-      }
-
-      log.info("Found {} records in {}", countTable, peerTable2);
-      Assert.assertEquals(masterTable2Records, countTable);
-
-    } finally {
-      peer1Cluster.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
deleted file mode 100644
index 62c09f5..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any
- * remote instance, merely tracks what is stored locally.
- */
-public class ReplicationSourceOnlyIT extends ConfigurableMacIT {
-  @Override
-  public int defaultTimeoutSeconds() {
-    return 300;
-  }
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
-    Multimap<String,String> logs = HashMultimap.create();
-    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.fetchColumnFamily(LogColumnFamily.NAME);
-    scanner.setRange(new Range());
-    for (Entry<Key,Value> entry : scanner) {
-      if (Thread.interrupted()) {
-        return logs;
-      }
-
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
-      for (String log : logEntry.logSet) {
-        // Need to normalize the log file from LogEntry
-        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
-      }
-    }
-    return logs;
-  }
-
-  @Test
-  public void replicationEntriesPrecludeWalDeletion() throws Exception {
-    final Connector conn = getConnector();
-    String table1 = "table1", table2 = "table2", table3 = "table3";
-    final Multimap<String,String> logs = HashMultimap.create();
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            logs.putAll(getLogs(conn));
-          } catch (TableNotFoundException e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
-    conn.tableOperations().create(table1);
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table2);
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    conn.tableOperations().create(table3);
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-    Thread.sleep(1000);
-
-    // Write some data to table3
-    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    // Force a write to metadata for the data written
-    for (String table : Arrays.asList(table1, table2, table3)) {
-      conn.tableOperations().flush(table, null, null, true);
-    }
-
-    keepRunning.set(false);
-    t.join(5000);
-
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Set<String> replFiles = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      replFiles.add(entry.getKey().getRow().toString());
-    }
-
-    // We might have a WAL that was use solely for the replication table
-    // We want to remove that from our list as it should not appear in the replication table
-    String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
-    Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
-    while (observedLogs.hasNext()) {
-      Entry<String,String> observedLog = observedLogs.next();
-      if (replicationTableId.equals(observedLog.getValue())) {
-        observedLogs.remove();
-      }
-    }
-
-    // We should have *some* reference to each log that was seen in the metadata table
-    // They might not yet all be closed though (might be newfile)
-    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
-
-    for (String replFile : replFiles) {
-      Path p = new Path(replFile);
-      FileSystem fs = p.getFileSystem(new Configuration());
-      Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
deleted file mode 100644
index da874fa..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- * 
- */
-public class ReplicationTablesMacTest extends ConfigurableMacIT {
-
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-  }
-
-  @Test
-  public void combinerWorksOnMetadata() throws Exception {
-    Connector conn = getConnector();
-
-    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
-
-    ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
-
-    Status stat1 = StatusUtil.fileCreated(100);
-    Status stat2 = StatusUtil.fileClosed();
-
-    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
-    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-    bw.close();
-
-    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    System.out.println("Printing metadata table");
-
-    Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
-    Assert.assertEquals(stat1, actual);
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
-    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-    bw.close();
-
-    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-
-    actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
-    Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
-
-    Assert.assertEquals(expected, actual);
-  }
-
-}


Mime
View raw message