accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/4] accumulo git commit: ACCUMULO-3650 Push protobuf-java dependency into server-base.
Date Wed, 11 Mar 2015 21:36:48 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 01558a4f1 -> ded955e91


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 3e966c4..1b37968 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -34,12 +34,12 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.WorkAssigner;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 51c8c1f..b20fdcc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -39,8 +39,8 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index 0f7306c..9966feb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -41,8 +41,8 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index c7f47e4..1ebe65f 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -37,10 +37,10 @@ import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index b8e0b40..8532e1b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -35,12 +35,12 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
new file mode 100644
index 0000000..a127dcd
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationOperationsImplTest {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplTest.class);
+
+  private MockInstance inst;
+
+  @Rule
+  public TestName test = new TestName();
+
+  @Before
+  public void setup() {
+    inst = new MockInstance(test.getMethodName());
+  }
+
+  /**
+   * Spoof out the Master so we can call the implementation without starting a full instance.
+   */
+  private ReplicationOperationsImpl getReplicationOperations(ClientContext context) throws Exception {
+    Master master = EasyMock.createMock(Master.class);
+    EasyMock.expect(master.getConnector()).andReturn(inst.getConnector("root", new PasswordToken(""))).anyTimes();
+    EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes();
+    EasyMock.replay(master);
+
+    final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) {
+      @Override
+      protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException {
+        try {
+          return inst.getConnector("root", new PasswordToken("")).tableOperations().tableIdMap().get(tableName);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    return new ReplicationOperationsImpl(context) {
+      @Override
+      protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set<String> wals)
+          throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+        try {
+          return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
+        } catch (TException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  @Test
+  public void waitsUntilEntriesAreReplicated() throws Exception {
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create("foo");
+    Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+    bw.addMutation(m);
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+    final ReplicationOperationsImpl roi = getReplicationOperations(context);
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    Assert.assertFalse(done.get());
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.putDelete(ReplicationSection.COLF, tableId);
+    bw.addMutation(m);
+    bw.flush();
+    bw.close();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    Assert.assertFalse(done.get());
+
+    m = new Mutation(file2);
+    m.putDelete(StatusSection.NAME, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // After both metadata and replication
+    Assert.assertTrue("Drain never finished", done.get());
+    Assert.assertFalse("Saw unexpectetd exception", exception.get());
+  }
+
+  @Test
+  public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create("foo");
+    conn.tableOperations().create("bar");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+    Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+
+    bw.addMutation(m);
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat));
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+
+    final ReplicationOperationsImpl roi = getReplicationOperations(context);
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId1);
+    bw.addMutation(m);
+    bw.flush();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId1);
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // After both metadata and replication
+    Assert.assertTrue("Drain never completed", done.get());
+    Assert.assertFalse("Saw unexpected exception", exception.get());
+  }
+
+  @Test
+  public void inprogressReplicationRecordsBlockExecution() throws Exception {
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create("foo");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    bw.close();
+
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
+    logEntry.server = "tserver";
+    logEntry.filename = file1;
+    logEntry.tabletId = 1;
+    logEntry.logSet = Arrays.asList(file1);
+    logEntry.timestamp = System.currentTimeMillis();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(logEntry.getRow());
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+    final ReplicationOperationsImpl roi = getReplicationOperations(context);
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build();
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus));
+    bw.addMutation(m);
+    bw.flush();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // New records, but not fully replicated ones don't cause it to complete
+    Assert.assertFalse("Drain somehow finished", done.get());
+    Assert.assertFalse("Saw unexpected exception", exception.get());
+  }
+
+  @Test
+  public void laterCreatedLogsDontBlockExecution() throws Exception {
+    Connector conn = inst.getConnector("root", new PasswordToken(""));
+    conn.tableOperations().create("foo");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    System.out.println("Reading metadata first time");
+    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      System.out.println(e.getKey());
+    }
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
+    final ReplicationOperationsImpl roi = getReplicationOperations(context);
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // We need to wait long enough for the table to read once
+    Thread.sleep(2000);
+
+    // Write another file, but also delete the old files
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID());
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId1);
+    bw.addMutation(m);
+    bw.close();
+
+    System.out.println("Reading metadata second time");
+    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      System.out.println(e.getKey());
+    }
+
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId1);
+    bw.addMutation(m);
+    bw.close();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did)
+    Assert.assertTrue("Drain didn't finish", done.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
index ae84472..4f1e159 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
@@ -33,7 +33,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -42,9 +42,6 @@ import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
 
-/**
- *
- */
 public class FinishedWorkUpdaterTest {
 
   @Rule

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index 4bd4b36..952fb2c 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -39,8 +39,8 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -51,9 +51,6 @@ import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
 
-/**
- *
- */
 public class RemoveCompleteReplicationRecordsTest {
 
   private RemoveCompleteReplicationRecords rcrr;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 0f1a7a1..3f3250d 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -39,11 +39,11 @@ import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
@@ -53,9 +53,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-/**
- *
- */
 public class SequentialWorkAssignerTest {
 
   @Rule

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
index 7ee3a89..c691f92 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
@@ -39,10 +39,10 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -53,9 +53,6 @@ import org.junit.rules.TestName;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
-/**
- *
- */
 public class StatusMakerTest {
 
   @Rule

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index 4282e4e..febd54c 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -43,11 +43,11 @@ import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index f53547a..6373def 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -33,11 +33,11 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -49,9 +49,6 @@ import org.junit.rules.TestName;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
-/**
- *
- */
 public class WorkMakerTest {
 
   private MockInstance instance;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 5c3fc2d..254e5d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -37,11 +37,11 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.tserver.Mutations;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 45e59cc..a03e05d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -36,17 +36,13 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ReplicationClient;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
@@ -61,6 +57,10 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.replication.ReplicaSystem;
+import org.apache.accumulo.server.replication.ReplicaSystemHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
@@ -75,9 +75,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
-/**
- *
- */
 public class AccumuloReplicaSystem implements ReplicaSystem {
   private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
   private static final String RFILE_SUFFIX = "." + RFile.EXTENSION;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index c23cd94..8e39d1c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -26,23 +26,23 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.ReplicaSystem;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.server.replication.ReplicaSystemHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 8ba8128..594d9c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.MapCounter;
@@ -45,6 +44,7 @@ import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.util.MasterMetadataUtil;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.util.ReplicationTableUtil;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 95fe24a..6152500 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -78,8 +78,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -103,6 +101,8 @@ import org.apache.accumulo.server.metrics.Metrics;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 import org.apache.accumulo.server.util.FileUtil;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 1a32ea4..103fd81 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -42,11 +42,11 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
@@ -58,9 +58,6 @@ import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
 
-/**
- *
- */
 public class AccumuloReplicaSystemTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index 15306a8..838ba1a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -22,24 +22,21 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.ReplicaSystem;
+import org.apache.accumulo.server.replication.ReplicaSystemHelper;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
 
-/**
- *
- */
 public class ReplicationProcessorTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
index 91e1a4b..d3986ed 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -19,11 +19,11 @@ package org.apache.accumulo.test.replication;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.ReplicaSystem;
+import org.apache.accumulo.server.replication.ReplicaSystemHelper;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/main/java/org/apache/accumulo/test/replication/ReplicationTablesPrinterThread.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationTablesPrinterThread.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationTablesPrinterThread.java
index 0dec94c..b222de6 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationTablesPrinterThread.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationTablesPrinterThread.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test.replication;
 import java.io.PrintStream;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.replication.PrintReplicationRecords;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.server.replication.PrintReplicationRecords;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index f9cff98..3c7297d 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.NewTableConfiguration;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -46,6 +45,7 @@ import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.minicluster.impl.ZooKeeperBindException;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 1ef47e5..5b89d9c 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -40,13 +40,13 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 125286f..9dec16e 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -44,8 +43,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -54,6 +51,9 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 1c359b1..54348db 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -43,7 +43,6 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -61,9 +60,6 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusFormatter;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -76,7 +72,11 @@ import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
+import org.apache.accumulo.server.replication.StatusFormatter;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
index 2b33b4d..91fa8cd 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
@@ -36,10 +36,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.harness.SharedMiniClusterIT;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
index 5cfc737..f486424 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -44,8 +43,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -54,6 +51,9 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
index 6f6ab8a..7ae60bf 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -39,13 +38,14 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.logger.LogEvents;
@@ -61,9 +61,6 @@ import org.junit.Test;
 
 import com.google.common.collect.Iterables;
 
-/**
- *
- */
 public class UnusedWalDoesntCloseReplicationStatusIT extends ConfigurableMacIT {
 
   @Override


Mime
View raw message