accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [04/10] accumulo git commit: ACCUMULO-3147 Create replication table at initialize, upgrade
Date Fri, 07 Nov 2014 22:31:19 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 4d99495..86ac87e 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.io.Text;
 import org.easymock.IAnswer;
 import org.junit.Assert;
@@ -98,7 +97,7 @@ public class CloseWriteAheadLogReferencesTest {
     data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4)).andReturn(bs);
+    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
     bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
     bs.fetchColumnFamily(LogColumnFamily.NAME);
@@ -141,7 +140,7 @@ public class CloseWriteAheadLogReferencesTest {
     data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4)).andReturn(bs);
+    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
     bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
     bs.fetchColumnFamily(LogColumnFamily.NAME);
@@ -197,7 +196,7 @@ public class CloseWriteAheadLogReferencesTest {
     data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4)).andReturn(bs);
+    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
     bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
     bs.fetchColumnFamily(LogColumnFamily.NAME);
@@ -277,7 +276,7 @@ public class CloseWriteAheadLogReferencesTest {
     data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4)).andReturn(bs);
+    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
     bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
     bs.fetchColumnFamily(LogColumnFamily.NAME);
@@ -309,7 +308,6 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
     Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wal/tserver+port/12345");
     m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
@@ -331,7 +329,6 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
     Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
     m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
@@ -353,8 +350,7 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationUtil.createReplicationTable(conn);
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     Mutation m = new Mutation(file);
     StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.ingestedUntil(1000)));
     bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index 41ca911..891f267 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master;
 
+import static org.apache.accumulo.master.util.TableValidators.NOT_METADATA;
 import static org.apache.accumulo.master.util.TableValidators.NOT_ROOT_ID;
 import static org.apache.accumulo.master.util.TableValidators.NOT_SYSTEM;
 import static org.apache.accumulo.master.util.TableValidators.VALID_ID;
@@ -318,7 +319,7 @@ class FateServiceHandler implements FateService.Iface {
       }
       case TABLE_DELETE_RANGE: {
         TableOperation tableOp = TableOperation.DELETE_RANGE;
-        String tableName = validateTableNameArgument(arguments.get(0), tableOp, NOT_SYSTEM);
+        String tableName = validateTableNameArgument(arguments.get(0), tableOp, NOT_METADATA);
         Text startRow = ByteBufferUtil.toText(arguments.get(1));
         Text endRow = ByteBufferUtil.toText(arguments.get(2));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index e3fc69d..b6b96a0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
@@ -365,11 +366,16 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
             TableManager.prepareNewNamespaceState(instance.getInstanceID(), id, ns, NodeExistsPolicy.SKIP);
         }
 
+        // create replication table in zk
+        log.debug("Upgrade creating table " + ReplicationTable.NAME + " (ID: " + ReplicationTable.ID + ")");
+        TableManager.prepareNewTableState(instance.getInstanceID(), ReplicationTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, ReplicationTable.NAME,
+            TableState.OFFLINE, NodeExistsPolicy.SKIP);
+
         // create root table
         log.debug("Upgrade creating table " + RootTable.NAME + " (ID: " + RootTable.ID + ")");
         TableManager.prepareNewTableState(instance.getInstanceID(), RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE,
             NodeExistsPolicy.SKIP);
-        Initialize.initMetadataConfig();
+        Initialize.initSystemTablesConfig();
         // ensure root user can flush root table
         security.grantTablePermission(SystemCredentials.get().toThrift(instance), security.getRootUsername(), RootTable.ID, TablePermission.ALTER_TABLE,
             Namespaces.ACCUMULO_NAMESPACE_ID);
@@ -379,15 +385,15 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
         for (String tableId : zoo.getChildren(tables)) {
           String targetNamespace = (MetadataTable.ID.equals(tableId) || RootTable.ID.equals(tableId)) ? Namespaces.ACCUMULO_NAMESPACE_ID
               : Namespaces.DEFAULT_NAMESPACE_ID;
-          log.debug("Upgrade moving table " + new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), UTF_8) + " (ID: "
-              + tableId + ") into namespace with ID " + targetNamespace);
+          log.debug("Upgrade moving table " + new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), UTF_8) + " (ID: " + tableId
+              + ") into namespace with ID " + targetNamespace);
           zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE, targetNamespace.getBytes(UTF_8), NodeExistsPolicy.SKIP);
         }
 
         // rename metadata table
         log.debug("Upgrade renaming table " + MetadataTable.OLD_NAME + " (ID: " + MetadataTable.ID + ") to " + MetadataTable.NAME);
-        zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME,
-            Tables.qualify(MetadataTable.NAME).getSecond().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+        zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME, Tables.qualify(MetadataTable.NAME).getSecond().getBytes(UTF_8),
+            NodeExistsPolicy.OVERWRITE);
 
         moveRootTabletToRootTable(zoo);
 
@@ -445,6 +451,11 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
                 MetadataTableUtil.moveMetaDeleteMarkers(instance, SystemCredentials.get());
                 version++;
               }
+              if (version == ServerConstants.MOVE_TO_REPLICATION_TABLE - 1) {
+                log.info("Updating metadata table with entries for the replication table");
+                MetadataTableUtil.createReplicationTable(instance, SystemCredentials.get());
+                version++;
+              }
               log.info("Updating persistent data version.");
               Accumulo.updateAccumuloVersion(fs, accumuloPersistentVersion);
               log.info("Upgrade complete");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
index d4a9af2..1554f52 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
@@ -25,7 +25,6 @@ import javax.management.ObjectName;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
@@ -47,21 +46,19 @@ public class ReplicationMetrics extends AbstractMetricsImpl implements Replicati
   private static final String METRICS_PREFIX = "replication";
 
   private Connector conn;
-  private TableOperations tops;
   private ObjectName objectName = null;
   private ReplicationUtil replicationUtil;
 
   public ReplicationMetrics(Connector conn) throws MalformedObjectNameException {
     super();
     this.conn = conn;
-    this.tops = conn.tableOperations();
     objectName = new ObjectName("accumulo.server.metrics:service=Replication Metrics,name=ReplicationMBean,instance=" + Thread.currentThread().getName());
     replicationUtil = new ReplicationUtil();
   }
 
   @Override
   public int getNumFilesPendingReplication() {
-    if (!tops.exists(ReplicationTable.NAME)) {
+    if (!ReplicationTable.isOnline(conn)) {
       return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 8e1d036..3e966c4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -22,7 +22,6 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -33,6 +32,7 @@ import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -151,7 +151,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
     Scanner s;
     try {
       s = ReplicationTable.getScanner(conn);
-    } catch (TableNotFoundException e) {
+    } catch (ReplicationTableOfflineException e) {
       return;
     }
 
@@ -175,8 +175,8 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
       Scanner workScanner;
       try {
         workScanner = ReplicationTable.getScanner(conn);
-      } catch (TableNotFoundException e) {
-        log.warn("Replication table was deleted. Will retry...");
+      } catch (ReplicationTableOfflineException e) {
+        log.warn("Replication table is offline. Will retry...");
         UtilWaitThread.sleep(5000);
         return;
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 8bc18eb..51c8c1f 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -28,7 +28,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -38,6 +37,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -63,8 +63,8 @@ public class FinishedWorkUpdater implements Runnable {
   public void run() {
     log.debug("Looking for finished replication work");
 
-    if (!conn.tableOperations().exists(ReplicationTable.NAME)) {
-      log.debug("Replication table doesn't yet exist, will retry");
+    if (!ReplicationTable.isOnline(conn)) {
+      log.debug("Replication table is not yet online, will retry");
       return;
     }
 
@@ -73,8 +73,8 @@ public class FinishedWorkUpdater implements Runnable {
     try {
       bs = ReplicationTable.getBatchScanner(conn, 4);
       replBw = ReplicationTable.getBatchWriter(conn);
-    } catch (TableNotFoundException e) {
-      log.debug("Table did exist, but was deleted, will retry");
+    } catch (ReplicationTableOfflineException e) {
+      log.debug("Table is no longer online, will retry");
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index ef76916..7aaa465 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -40,6 +39,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -51,7 +51,7 @@ import com.google.common.base.Stopwatch;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Delete replication entries from {@link ReplicationTable#NAME} that are full replicated and closed
+ * Delete replication entries from the replication table that are fully replicated and closed
  */
 public class RemoveCompleteReplicationRecords implements Runnable {
   private static final Logger log = LoggerFactory.getLogger(RemoveCompleteReplicationRecords.class);
@@ -69,8 +69,8 @@ public class RemoveCompleteReplicationRecords implements Runnable {
     try {
       bs = ReplicationTable.getBatchScanner(conn, 4);
       bw = ReplicationTable.getBatchWriter(conn);
-    } catch (TableNotFoundException e) {
-      log.debug("Not attempting to remove complete replication records as the table ({}) doesn't yet exist", ReplicationTable.NAME);
+    } catch (ReplicationTableOfflineException e) {
+      log.debug("Not attempting to remove complete replication records as the table ({}) isn't yet online", ReplicationTable.NAME);
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index c0f2a6c..3f831a8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.master.replication;
 
 import java.util.Map.Entry;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -34,9 +36,9 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.io.Text;
@@ -79,7 +81,7 @@ public class StatusMaker {
       // Read from a source table (typically accumulo.metadata)
       final Scanner s;
       try {
-        s = conn.createScanner(sourceTableName, new Authorizations());
+        s = conn.createScanner(sourceTableName, Authorizations.EMPTY);
       } catch (TableNotFoundException e) {
         throw new RuntimeException(e);
       }
@@ -92,12 +94,12 @@ public class StatusMaker {
       for (Entry<Key,Value> entry : s) {
         // Get a writer to the replication table
         if (null == replicationWriter) {
-          // Ensures table exists and is properly configured
-          ReplicationUtil.createReplicationTable(conn);
+          // Ensures table is online
           try {
-            setBatchWriter(ReplicationTable.getBatchWriter(conn));
-          } catch (TableNotFoundException e) {
-            log.warn("Replication table did exist, but does not anymore");
+            ReplicationTable.setOnline(conn);
+            replicationWriter = ReplicationTable.getBatchWriter(conn);
+          } catch (ReplicationTableOfflineException | AccumuloSecurityException | AccumuloException e) {
+            log.warn("Replication table did not come online");
             replicationWriter = null;
             return;
           }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 95f607f..f4fd676 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -34,6 +33,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -64,8 +64,8 @@ public class WorkMaker {
 
   public void run() {
     ServerConfigurationFactory serverConf = new ServerConfigurationFactory(conn.getInstance());
-    if (!conn.tableOperations().exists(ReplicationTable.NAME)) {
-      log.info("Replication table does not yet exist");
+    if (!ReplicationTable.isOnline(conn)) {
+      log.info("Replication table is not yet online");
       return;
     }
 
@@ -77,8 +77,8 @@ public class WorkMaker {
         if (null == writer) {
           setBatchWriter(ReplicationTable.getBatchWriter(conn));
         }
-      } catch (TableNotFoundException e) {
-        log.warn("Replication table did exist, but does not anymore");
+      } catch (ReplicationTableOfflineException e) {
+        log.warn("Replication table was online, but not anymore");
         writer = null;
         return;
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/main/java/org/apache/accumulo/master/util/TableValidators.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/util/TableValidators.java b/server/master/src/main/java/org/apache/accumulo/master/util/TableValidators.java
index 162b813..a770b47 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/util/TableValidators.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/util/TableValidators.java
@@ -19,14 +19,20 @@ package org.apache.accumulo.master.util;
 import static org.apache.accumulo.core.client.impl.Tables.VALID_NAME_REGEX;
 import static org.apache.accumulo.core.client.impl.Tables.qualify;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.accumulo.core.client.impl.Namespaces;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.util.Validator;
 
+import com.google.common.base.Joiner;
+
 public class TableValidators {
   public static final String VALID_ID_REGEX = "^([a-z0-9]+)$"; // BigDecimal base36
-  
+
   public static final Validator<String> VALID_NAME = new Validator<String>() {
     @Override
     public boolean isValid(String tableName) {
@@ -44,7 +50,8 @@ public class TableValidators {
   public static final Validator<String> VALID_ID = new Validator<String>() {
     @Override
     public boolean isValid(String tableId) {
-      return tableId != null && (RootTable.ID.equals(tableId) || MetadataTable.ID.equals(tableId) || tableId.matches(VALID_ID_REGEX));
+      return tableId != null
+          && (RootTable.ID.equals(tableId) || MetadataTable.ID.equals(tableId) || ReplicationTable.ID.equals(tableId) || tableId.matches(VALID_ID_REGEX));
     }
 
     @Override
@@ -55,6 +62,21 @@ public class TableValidators {
     }
   };
 
+  public static final Validator<String> NOT_METADATA = new Validator<String>() {
+
+    private List<String> metadataTables = Arrays.asList(RootTable.NAME, MetadataTable.NAME);
+
+    @Override
+    public boolean isValid(String tableName) {
+      return !metadataTables.contains(tableName);
+    }
+
+    @Override
+    public String invalidMessage(String tableName) {
+      return "Table cannot be one of {" + Joiner.on(",").join(metadataTables) + "}";
+    }
+  };
+
   public static final Validator<String> NOT_SYSTEM = new Validator<String>() {
 
     @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
index f8cc775..7e8f762 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
@@ -34,7 +34,6 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -68,8 +67,6 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void recordsWithProgressUpdateBothTables() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
-
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
     ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
@@ -98,8 +95,6 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void chooseMinimumBeginOffset() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
-
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     // @formatter:off
     Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
@@ -136,8 +131,6 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
-
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     // @formatter:off
     Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index 2255e21..4bd4b36 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -41,7 +41,6 @@ import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -53,7 +52,7 @@ import org.junit.rules.TestName;
 import com.google.common.collect.Iterables;
 
 /**
- * 
+ *
  */
 public class RemoveCompleteReplicationRecordsTest {
 
@@ -73,7 +72,6 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void notYetReplicationRecordsIgnored() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
     for (int i = 0; i < numRecords; i++) {
@@ -103,7 +101,6 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void partiallyReplicatedRecordsIgnored() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
     Status.Builder builder = Status.newBuilder();
@@ -138,7 +135,6 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 
@@ -191,7 +187,6 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void replicatedClosedRowsAreRemoved() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 
@@ -283,7 +278,6 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception {
-    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index d5bcfbd..0f1a7a1 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -44,7 +44,6 @@ import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
@@ -55,7 +54,7 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 /**
- * 
+ *
  */
 public class SequentialWorkAssignerTest {
 
@@ -85,8 +84,7 @@ public class SequentialWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -158,8 +156,7 @@ public class SequentialWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -238,8 +235,7 @@ public class SequentialWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -359,8 +355,7 @@ public class SequentialWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index 5d69216..4282e4e 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -48,7 +48,6 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
@@ -136,8 +135,7 @@ public class UnorderedWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
@@ -199,8 +197,7 @@ public class UnorderedWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -271,8 +268,7 @@ public class UnorderedWorkAssignerTest {
     // Set the connector
     assigner.setConnector(conn);
 
-    // Create and grant ourselves write to the replication table
-    ReplicationUtil.createReplicationTable(conn);
+    // grant ourselves write to the replication table
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index 280ca82..0455c44 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.master.replication;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -37,7 +36,7 @@ import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.core.security.TablePermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -54,8 +53,6 @@ import com.google.common.collect.Iterables;
  */
 public class WorkMakerTest {
 
-  private final Map<String,TableConfiguration> tableConfs = new HashMap<>();
-
   private MockInstance instance;
   private Connector conn;
 
@@ -66,14 +63,8 @@ public class WorkMakerTest {
   public void createMockAccumulo() throws Exception {
     instance = new MockInstance();
     conn = instance.getConnector("root", new PasswordToken(""));
-
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    conn.tableOperations().create(ReplicationTable.NAME);
-
-    tableConfs.clear();
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 4c21c39..7d3cb14 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -28,7 +28,6 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -76,10 +75,6 @@ public class ReplicationServlet extends BasicServlet {
     int totalWorkQueueSize = replicationUtil.getMaxReplicationThreads(systemProps, mmi);
 
     TableOperations tops = conn.tableOperations();
-    if (!tops.exists(ReplicationTable.NAME)) {
-      banner(sb, "", "Replication table does not yet exist");
-      return;
-    }
 
     Table replicationStats = new Table("replicationStats", "Replication Status");
     replicationStats.addSortableColumn("Table");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 1843acb..c23cd94 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -36,6 +35,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -101,7 +101,7 @@ public class ReplicationProcessor implements Processor {
     Status status;
     try {
       status = getStatus(file, target);
-    } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+    } catch (ReplicationTableOfflineException | AccumuloException | AccumuloSecurityException e) {
       log.error("Could not look for replication record", e);
       throw new IllegalStateException("Could not look for replication record", e);
     } catch (InvalidProtocolBufferException e) {
@@ -171,7 +171,7 @@ public class ReplicationProcessor implements Processor {
     return true;
   }
 
-  protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+  protected Status getStatus(String file, ReplicationTarget target) throws ReplicationTableOfflineException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
     Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
     s.setRange(Range.exact(file));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
index 7ebcdd7..8ceabcb 100644
--- a/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.HashMap;
-import java.util.Map.Entry;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.AccumuloServerException;
+import org.apache.accumulo.core.client.impl.Namespaces;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -185,7 +186,7 @@ public class LargeSplitRowIT extends ConfigurableMacIT {
 
     while (iterator.hasNext()) {
       String curr = iterator.next();
-      if (!curr.equals("accumulo.metadata") && !curr.equals("accumulo.root")) {
+      if (!curr.startsWith(Namespaces.ACCUMULO_NAMESPACE + ".")) {
         tableName = curr;
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index c69de2d..da2d20f 100644
--- a/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.util.UtilWaitThread;
@@ -50,7 +51,7 @@ import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 public class MasterRepairsDualAssignmentIT extends ConfigurableMacIT {
-  
+
   @Override
   public int defaultTimeoutSeconds() {
     return 5 * 60;
@@ -95,14 +96,18 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacIT {
     assertEquals(2, states.size());
     // Kill a tablet server... we don't care which one... wait for everything to be reassigned
     cluster.killProcess(ServerType.TABLET_SERVER, cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+    Set<TServerInstance> replStates = new HashSet<>();
     // Find out which tablet server remains
     while (true) {
       UtilWaitThread.sleep(1000);
       states.clear();
+      replStates.clear();
       boolean allAssigned = true;
       for (TabletLocationState tls : store) {
         if (tls != null && tls.current != null) {
           states.add(tls.current);
+        } else if (tls.extent.equals(new KeyExtent(new Text(ReplicationTable.ID), null, null))) {
+          replStates.add(tls.current);
         } else {
           allAssigned = false;
         }
@@ -111,6 +116,7 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacIT {
       if (states.size() != 2 && allAssigned == true)
         break;
     }
+    assertEquals(1, replStates.size());
     assertEquals(1, states.size());
     // pick an assigned tablet and assign it to the old tablet
     TabletLocationState moved = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MetaConstraintRetryIT.java b/test/src/test/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
index b3c3640..5dc75d0 100644
--- a/test/src/test/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
@@ -31,13 +31,13 @@ import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 public class MetaConstraintRetryIT extends SimpleMacIT {
-  
+
   @Override
   public int defaultTimeoutSeconds() {
     return 30;
   }
 
-  //a test for ACCUMULO-3096
+  // a test for ACCUMULO-3096
   @Test(expected = ConstraintViolationException.class)
   public void test() throws Exception {
 
@@ -47,13 +47,12 @@ public class MetaConstraintRetryIT extends SimpleMacIT {
     Writer w = new Writer(super.getConnector().getInstance(), credentials, MetadataTable.ID);
     KeyExtent extent = new KeyExtent(new Text("5"), null, null);
 
-
     Mutation m = new Mutation(extent.getMetadataEntry());
     // unknown columns should cause contraint violation
     m.put("badcolfam", "badcolqual", "3");
 
     try {
-      MetadataTableUtil.update(w, credentials, null, m);
+      MetadataTableUtil.update(w, null, m);
     } catch (RuntimeException e) {
       if (e.getCause().getClass().equals(ConstraintViolationException.class)) {
         throw (ConstraintViolationException) e.getCause();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/NamespacesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/NamespacesIT.java b/test/src/test/java/org/apache/accumulo/test/NamespacesIT.java
index 5e885b5..0527a9a 100644
--- a/test/src/test/java/org/apache/accumulo/test/NamespacesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/NamespacesIT.java
@@ -102,7 +102,7 @@ public class NamespacesIT extends SimpleMacIT {
     for (String t : c.tableOperations().list())
       if (!Tables.qualify(t).getFirst().equals(Namespaces.ACCUMULO_NAMESPACE))
         c.tableOperations().delete(t);
-    assertEquals(2, c.tableOperations().list().size());
+    assertEquals(3, c.tableOperations().list().size());
     for (String n : c.namespaceOperations().list())
       if (!n.equals(Namespaces.ACCUMULO_NAMESPACE) && !n.equals(Namespaces.DEFAULT_NAMESPACE))
         c.namespaceOperations().delete(n);
@@ -318,7 +318,7 @@ public class NamespacesIT extends SimpleMacIT {
     // verify entry is filtered out (also, verify conflict checking API)
     c.namespaceOperations().checkIteratorConflicts(namespace, setting, EnumSet.allOf(IteratorScope.class));
     c.namespaceOperations().attachIterator(namespace, setting);
-    UtilWaitThread.sleep(2*1000);
+    UtilWaitThread.sleep(2 * 1000);
     try {
       c.namespaceOperations().checkIteratorConflicts(namespace, setting, EnumSet.allOf(IteratorScope.class));
       fail();
@@ -334,7 +334,7 @@ public class NamespacesIT extends SimpleMacIT {
 
     // verify can see inserted entry again
     c.namespaceOperations().removeIterator(namespace, setting.getName(), EnumSet.allOf(IteratorScope.class));
-    UtilWaitThread.sleep(2*1000);
+    UtilWaitThread.sleep(2 * 1000);
     assertFalse(c.namespaceOperations().listIterators(namespace).containsKey(iterName));
     assertFalse(c.tableOperations().listIterators(t1).containsKey(iterName));
     s = c.createScanner(t1, Authorizations.EMPTY);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 58042b2..4114dc8 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.shell.Shell;
@@ -121,6 +120,7 @@ public class ShellServerIT extends SimpleMacIT {
 
   private static class NoOpErrorMessageCallback extends ErrorMessageCallback {
     private static final String empty = "";
+
     @Override
     public String getErrorMessage() {
       return empty;
@@ -239,8 +239,8 @@ public class ShellServerIT extends SimpleMacIT {
 
   @Before
   public void setupShell() throws Exception {
-    ts = new TestShell(ROOT_PASSWORD, getStaticCluster().getConfig().getInstanceName(), getStaticCluster().getConfig().getZooKeepers(),
-        getStaticCluster().getConfig().getClientConfFile().getAbsolutePath());
+    ts = new TestShell(ROOT_PASSWORD, getStaticCluster().getConfig().getInstanceName(), getStaticCluster().getConfig().getZooKeepers(), getStaticCluster()
+        .getConfig().getClientConfFile().getAbsolutePath());
   }
 
   @AfterClass
@@ -252,7 +252,7 @@ public class ShellServerIT extends SimpleMacIT {
   public void deleteTables() throws Exception {
     Connector c = getConnector();
     for (String table : c.tableOperations().list()) {
-      if (!table.equals(MetadataTable.NAME) && !table.equals(RootTable.NAME) && !table.equals("trace"))
+      if (!table.startsWith(Namespaces.ACCUMULO_NAMESPACE + ".") && !table.equals("trace"))
         try {
           c.tableOperations().delete(table);
         } catch (TableNotFoundException e) {
@@ -427,7 +427,7 @@ public class ShellServerIT extends SimpleMacIT {
     ts.exec("deleteuser -f xyzzy", true);
     ts.exec("users", true, "xyzzy", false);
   }
-  
+
   @Test(timeout = 60 * 1000)
   public void durability() throws Exception {
     final String table = name.getMethodName();
@@ -436,7 +436,7 @@ public class ShellServerIT extends SimpleMacIT {
     ts.exec("insert -d foo a cf cq2 2", false, "foo", true);
     ts.exec("scan -r a", true, "randomGunkaASDFWEAQRd", true);
     ts.exec("scan -r a", true, "foo", false);
-  }  
+  }
 
   @Test
   public void iter() throws Exception {
@@ -1105,9 +1105,8 @@ public class ShellServerIT extends SimpleMacIT {
     FileUtils.copyURLToFile(this.getClass().getResource("/FooConstraint.jar"), fooConstraintJar);
     fooConstraintJar.deleteOnExit();
 
-    ts.exec(
-        "config -s " + Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1=" + fooFilterJar.toURI().toString() + "," + fooConstraintJar.toURI().toString(),
-        true);
+    ts.exec("config -s " + Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "cx1=" + fooFilterJar.toURI().toString() + ","
+        + fooConstraintJar.toURI().toString(), true);
 
     ts.exec("createtable " + table, true);
     ts.exec("config -t " + table + " -s " + Property.TABLE_CLASSPATH.getKey() + "=cx1", true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/WaitForBalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/WaitForBalanceIT.java b/test/src/test/java/org/apache/accumulo/test/WaitForBalanceIT.java
index a331faa..b558715 100644
--- a/test/src/test/java/org/apache/accumulo/test/WaitForBalanceIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/WaitForBalanceIT.java
@@ -38,12 +38,13 @@ import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 public class WaitForBalanceIT extends ConfigurableMacIT {
-  
+
   @Test(timeout = 30 * 1000)
   public void test() throws Exception {
     final Connector c = getConnector();
     // ensure the metadata table is online
-    for (@SuppressWarnings("unused") Entry<Key,Value> unused : c.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
+    for (@SuppressWarnings("unused")
+    Entry<Key,Value> unused : c.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
       ;
     c.instanceOperations().waitForBalance();
     assertTrue(isBalanced());
@@ -61,10 +62,10 @@ public class WaitForBalanceIT extends ConfigurableMacIT {
   }
 
   private boolean isBalanced() throws Exception {
-    final Map<String, Integer> counts = new HashMap<String, Integer>();
+    final Map<String,Integer> counts = new HashMap<String,Integer>();
     int offline = 0;
     final Connector c = getConnector();
-    for (String tableName : new String[]{MetadataTable.NAME, RootTable.NAME}) {
+    for (String tableName : new String[] {MetadataTable.NAME, RootTable.NAME}) {
       final Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
       s.setRange(MetadataSchema.TabletsSection.getRange());
       s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
@@ -88,7 +89,8 @@ public class WaitForBalanceIT extends ConfigurableMacIT {
         }
       }
     }
-    if (offline > 0) {
+    // the replication table is expected to be offline for this test, so ignore it
+    if (offline > 1) {
       System.out.println("Offline tablets " + offline);
       return false;
     }
@@ -107,5 +109,5 @@ public class WaitForBalanceIT extends ConfigurableMacIT {
     }
     return true;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 0000e7f..6ce5959 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -54,7 +54,6 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
@@ -164,8 +163,8 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
       final Connector connMaster = getConnector();
       final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
 
-      ReplicationUtil.createReplicationTable(connMaster);
-
+      ReplicationTable.setOnline(connMaster);
+      
       String peerUserName = "peer", peerPassword = "foo";
 
       String peerClusterName = "peer";
@@ -236,7 +235,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
 
       log.info("");
       log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+      for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
         log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
       }
 
@@ -274,7 +273,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
 
       log.info("");
       log.info("Fetching replication records:");
-      for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+      for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
         log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
       }
 
@@ -397,10 +396,6 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
       Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
           masterTable2);
 
-      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
-        Thread.sleep(500);
-      }
-
       // Restart the tserver to force a close on the WAL
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -523,7 +518,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
     for (@SuppressWarnings("unused")
     Entry<Key,Value> kv : connMaster.createScanner(masterTable, Authorizations.EMPTY)) {}
 
-    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+    for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
       log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
     }
 
@@ -637,10 +632,6 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
 
       log.info("Wrote all data to master cluster");
 
-      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
-        Thread.sleep(500);
-      }
-
       for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
         cluster.killProcess(ServerType.TABLET_SERVER, proc);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 69cd195..c514cf8 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -21,26 +21,34 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.conf.ColumnSet;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
@@ -51,6 +59,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusFormatter;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -61,7 +70,7 @@ import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
@@ -75,6 +84,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
@@ -130,6 +141,75 @@ public class ReplicationIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void replicationTableCreated() throws AccumuloException, AccumuloSecurityException {
+    Assert.assertTrue(getConnector().tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertEquals(ReplicationTable.ID, getConnector().tableOperations().tableIdMap().get(ReplicationTable.NAME));
+  }
+
+  @Test
+  public void verifyReplicationTableConfig() throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+    TableOperations tops = getConnector().tableOperations();
+    Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(ReplicationTable.NAME);
+
+    // verify combiners are only iterators (no versioning)
+    Assert.assertEquals(1, iterators.size());
+
+    // look for combiner
+    Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME));
+    Assert.assertTrue(iterators.get(ReplicationTable.COMBINER_NAME).containsAll(EnumSet.allOf(IteratorScope.class)));
+    for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) {
+      IteratorSetting is = tops.getIteratorSetting(ReplicationTable.NAME, ReplicationTable.COMBINER_NAME, scope);
+      Assert.assertEquals(30, is.getPriority());
+      Assert.assertEquals(StatusCombiner.class.getName(), is.getIteratorClass());
+      Assert.assertEquals(1, is.getOptions().size());
+      Assert.assertTrue(is.getOptions().containsKey("columns"));
+      String cols = is.getOptions().get("columns");
+      Column statusSectionCol = new Column(StatusSection.NAME);
+      Column workSectionCol = new Column(WorkSection.NAME);
+      Assert.assertEquals(
+          ColumnSet.encodeColumns(statusSectionCol.getColumnFamily(), statusSectionCol.getColumnQualifier()) + ","
+              + ColumnSet.encodeColumns(workSectionCol.getColumnFamily(), workSectionCol.getColumnQualifier()), cols);
+    }
+
+    boolean foundLocalityGroups = false;
+    boolean foundLocalityGroupDef1 = false;
+    boolean foundLocalityGroupDef2 = false;
+    boolean foundFormatter = false;
+    Joiner j = Joiner.on(",");
+    Function<Text,String> textToString = new Function<Text,String>() {
+      @Override
+      public String apply(Text text) {
+        return text.toString();
+      }
+    };
+    for (Entry<String,String> p : tops.getProperties(ReplicationTable.NAME)) {
+      String key = p.getKey();
+      String val = p.getValue();
+      // STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS
+      if (key.equals(Property.TABLE_FORMATTER_CLASS.getKey()) && val.equals(StatusFormatter.class.getName())) {
+        // look for formatter
+        foundFormatter = true;
+      } else if (key.equals(Property.TABLE_LOCALITY_GROUPS.getKey()) && val.equals(j.join(ReplicationTable.LOCALITY_GROUPS.keySet()))) {
+        // look for locality groups enabled
+        foundLocalityGroups = true;
+      } else if (key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())) {
+        // look for locality group column family definitions
+        if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.STATUS_LG_NAME)
+            && val.equals(j.join(Iterables.transform(ReplicationTable.STATUS_LG_COLFAMS, textToString)))) {
+          foundLocalityGroupDef1 = true;
+        } else if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.WORK_LG_NAME)
+            && val.equals(j.join(Iterables.transform(ReplicationTable.WORK_LG_COLFAMS, textToString)))) {
+          foundLocalityGroupDef2 = true;
+        }
+      }
+    }
+    Assert.assertTrue(foundLocalityGroups);
+    Assert.assertTrue(foundLocalityGroupDef1);
+    Assert.assertTrue(foundLocalityGroupDef2);
+    Assert.assertTrue(foundFormatter);
+  }
+
+  @Test
   public void correctRecordsCompleteFile() throws Exception {
     Connector conn = getConnector();
     String table = "table1";
@@ -146,17 +226,17 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     bw.close();
 
-    // After writing data, we'll get a replication table
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    // After writing data, we'll get a replication table online
+    boolean online = ReplicationTable.isOnline(conn);
     int attempts = 10;
     do {
-      if (!exists) {
+      if (!online) {
         UtilWaitThread.sleep(2000);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        online = ReplicationTable.isOnline(conn);
         attempts--;
       }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
+    } while (!online && attempts > 0);
+    Assert.assertTrue("Replication table was not online", online);
 
     for (int i = 0; i < 5; i++) {
       if (conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)) {
@@ -193,7 +273,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Scanner s;
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
-      s = conn.createScanner(MetadataTable.NAME, new Authorizations());
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
       for (Entry<Key,Value> entry : s) {
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
@@ -215,8 +295,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     Connector conn = getConnector();
     List<String> tables = new ArrayList<>();
 
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    // replication shouldn't be online when we begin
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (int i = 0; i < 5; i++) {
       String name = "table" + i;
@@ -225,7 +305,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // nor after we create some tables (that aren't being replicated)
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (String table : tables) {
       BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
@@ -243,21 +323,21 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // After writing data, still no replication table
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (String table : tables) {
       conn.tableOperations().compact(table, null, null, true, true);
     }
 
     // After compacting data, still no replication table
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (String table : tables) {
       conn.tableOperations().delete(table);
     }
 
     // After deleting tables, still no replication table
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
   }
 
   @Test
@@ -266,7 +346,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     String table1 = "table1", table2 = "table2";
 
     // replication shouldn't exist when we begin
-    Assert.assertFalse("Replication table already existed at the beginning of the test", conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertFalse("Replication table already online at the beginning of the test", ReplicationTable.isOnline(conn));
 
     // Create two tables
     conn.tableOperations().create(table1);
@@ -275,8 +355,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     // Enable replication on table1
     conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 
-    // Despite having replication on, we shouldn't have any need to write a record to it (and create it)
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    // Despite having replication on, we shouldn't have any need to write a record to it (and bring it online)
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Write some data to table1
     BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
@@ -295,19 +375,19 @@ public class ReplicationIT extends ConfigurableMacIT {
     // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1
     // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
 
-    // After writing data, we'll get a replication table
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    // After writing data, we'll get a replication table online
+    boolean online = ReplicationTable.isOnline(conn);
     int attempts = 10;
     do {
-      if (!exists) {
+      if (!online) {
         UtilWaitThread.sleep(5000);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        online = ReplicationTable.isOnline(conn);
         attempts--;
       }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
+    } while (!online && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", online);
 
-    Assert.assertTrue(conn.tableOperations().exists(ReplicationTable.NAME));
+    Assert.assertTrue(ReplicationTable.isOnline(conn));
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
 
     // Verify that we found a single replication record that's for table1
@@ -353,8 +433,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2
     // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
 
-    // After writing data, we'll get a replication table
-    Assert.assertTrue(conn.tableOperations().exists(ReplicationTable.NAME));
+    // After writing data, we'll get a replication table online
+    Assert.assertTrue(ReplicationTable.isOnline(conn));
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
 
     Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2));
@@ -558,12 +638,9 @@ public class ReplicationIT extends ConfigurableMacIT {
   public void noDeadlock() throws Exception {
     final Connector conn = getConnector();
 
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    ReplicationUtil.createReplicationTable(conn);
+    ReplicationTable.setOnline(conn);
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 
     final AtomicBoolean keepRunning = new AtomicBoolean(true);
     final Set<String> metadataWals = new HashSet<>();
@@ -648,7 +725,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
-      Scanner s = conn.createScanner(table, new Authorizations());
+      Scanner s = conn.createScanner(table, Authorizations.EMPTY);
       for (@SuppressWarnings("unused")
       Entry<Key,Value> entry : s) {}
     }
@@ -712,7 +789,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     conn.tableOperations().flush(table, null, null, true);
 
-    while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
+    while (!ReplicationTable.isOnline(conn)) {
       UtilWaitThread.sleep(2000);
     }
 
@@ -778,8 +855,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     Connector conn = getConnector();
     String table1 = "table1";
 
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    // replication shouldn't be online when we begin
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Create a table
     conn.tableOperations().create(table1);
@@ -819,17 +896,17 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     bw.close();
 
-    // Make sure the replication table exists at this point
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    // Make sure the replication table is online at this point
+    boolean online = ReplicationTable.isOnline(conn);
     attempts = 10;
     do {
-      if (!exists) {
+      if (!online) {
         UtilWaitThread.sleep(2000);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        online = ReplicationTable.isOnline(conn);
         attempts--;
       }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table was never created", exists);
+    } while (!online && attempts > 0);
+    Assert.assertTrue("Replication table was never created", online);
 
     // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
     for (int i = 0; i < 10 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
@@ -963,8 +1040,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     Connector conn = getConnector();
     String table1 = "table1";
 
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    // replication shouldn't be online when we begin
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Create two tables
     conn.tableOperations().create(table1);
@@ -1003,16 +1080,16 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertNotNull("Table ID was null", tableId);
 
     // Make sure the replication table exists at this point
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    boolean online = ReplicationTable.isOnline(conn);
     attempts = 5;
     do {
-      if (!exists) {
+      if (!online) {
         UtilWaitThread.sleep(500);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        online = ReplicationTable.isOnline(conn);
         attempts--;
       }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
+    } while (!online && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", online);
 
     for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
       Thread.sleep(1000);
@@ -1059,12 +1136,9 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     final Connector conn = getConnector();
 
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    ReplicationUtil.createReplicationTable(conn);
+    ReplicationTable.setOnline(conn);
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 
     final AtomicBoolean keepRunning = new AtomicBoolean(true);
     final Set<String> metadataWals = new HashSet<>();
@@ -1164,7 +1238,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : Arrays.asList(table1, table2, table3)) {
-      Scanner s = conn.createScanner(table, new Authorizations());
+      Scanner s = conn.createScanner(table, Authorizations.EMPTY);
       for (@SuppressWarnings("unused")
       Entry<Key,Value> entry : s) {}
     }
@@ -1265,8 +1339,8 @@ public class ReplicationIT extends ConfigurableMacIT {
     log.info("Got connector to MAC");
     String table1 = "table1";
 
-    // replication shouldn't exist when we begin
-    Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+    // replication shouldn't be online when we begin
+    Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Create two tables
     conn.tableOperations().create(table1);
@@ -1308,16 +1382,16 @@ public class ReplicationIT extends ConfigurableMacIT {
     bw.close();
 
     // Make sure the replication table exists at this point
-    boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+    boolean online = ReplicationTable.isOnline(conn);
     attempts = 10;
     do {
-      if (!exists) {
+      if (!online) {
         UtilWaitThread.sleep(1000);
-        exists = conn.tableOperations().exists(ReplicationTable.NAME);
+        online = ReplicationTable.isOnline(conn);
         attempts--;
       }
-    } while (!exists && attempts > 0);
-    Assert.assertTrue("Replication table did not exist", exists);
+    } while (!online && attempts > 0);
+    Assert.assertTrue("Replication table did not exist", online);
 
     // Grant ourselves the write permission for later
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
@@ -1391,7 +1465,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : new String[] {MetadataTable.NAME, table1}) {
-      s = conn.createScanner(table, new Authorizations());
+      s = conn.createScanner(table, Authorizations.EMPTY);
       for (@SuppressWarnings("unused")
       Entry<Key,Value> entry : s) {}
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
index ff64d0b..98d7676 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacTest.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
@@ -38,8 +37,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.replication.ReplicationUtil;
+import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.SimpleMacIT;
 import org.apache.hadoop.io.Text;
@@ -81,13 +79,10 @@ public class StatusCombinerMacTest extends SimpleMacIT {
   @Test
   public void test() throws Exception {
     Connector conn = getConnector();
-    if (conn.tableOperations().exists(ReplicationTable.NAME)) {
-      conn.tableOperations().delete(ReplicationTable.NAME);
-    }
-
-    ReplicationUtil.createReplicationTable(conn);
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    ReplicationTable.setOnline(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     long createTime = System.currentTimeMillis();
     try {
       Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3");
@@ -97,11 +92,11 @@ public class StatusCombinerMacTest extends SimpleMacIT {
       bw.close();
     }
 
-    Scanner s = conn.createScanner(ReplicationTable.NAME, new Authorizations());
+    Scanner s = ReplicationTable.getScanner(conn);
     Entry<Key,Value> entry = Iterables.getOnlyElement(s);
     Assert.assertEquals(StatusUtil.fileCreatedValue(createTime), entry.getValue());
 
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    bw = ReplicationTable.getBatchWriter(conn);
     try {
       Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3");
       StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.replicated(Long.MAX_VALUE)));
@@ -110,7 +105,7 @@ public class StatusCombinerMacTest extends SimpleMacIT {
       bw.close();
     }
 
-    s = conn.createScanner(ReplicationTable.NAME, new Authorizations());
+    s = ReplicationTable.getScanner(conn);
     entry = Iterables.getOnlyElement(s);
     Status stat = Status.parseFrom(entry.getValue().get());
     Assert.assertEquals(Long.MAX_VALUE, stat.getBegin());


Mime
View raw message