accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [4/4] accumulo git commit: ACCUMULO-3650 Push protobuf-java dependency into server-base.
Date Wed, 11 Mar 2015 21:36:51 GMT
ACCUMULO-3650 Push protobuf-java dependency into server-base.

This helps prevent protobuf from polluting the client's classpath.
Because Protobuf-generated code needs to be compiled with the version
included on the classpath, this helps prevent Accumulo from being
a pain point for clients (we add nothing more, just what hadoop bundles).

Needed to add a new RPC call to the master to support the implementation
of ReplicationOperations.drain(). Also removed a bunch of empty javadoc
class header blocks.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ded955e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ded955e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ded955e9

Branch: refs/heads/master
Commit: ded955e9186c588b1ab6d53105433c5d58aef46f
Parents: 01558a4
Author: Josh Elser <elserj@apache.org>
Authored: Wed Mar 11 15:55:20 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Mar 11 15:55:20 2015 -0400

----------------------------------------------------------------------
 core/pom.xml                                    |   27 -
 .../client/admin/ReplicationOperations.java     |   13 +-
 .../client/impl/ReplicationOperationsImpl.java  |  125 +-
 .../core/client/replication/ReplicaSystem.java  |   50 -
 .../replication/ReplicaSystemFactory.java       |   82 --
 .../core/master/thrift/MasterClientService.java | 1213 ++++++++++++++++++
 .../replication/PrintReplicationRecords.java    |   97 --
 .../core/replication/ReplicaSystemHelper.java   |   72 --
 .../core/replication/ReplicationTable.java      |    1 -
 .../core/replication/StatusFormatter.java       |  187 ---
 .../accumulo/core/replication/StatusUtil.java   |  216 ----
 .../core/replication/proto/Replication.java     |  949 --------------
 core/src/main/protobuf/replication.proto        |   26 -
 core/src/main/scripts/generate-protobuf.sh      |   98 --
 core/src/main/thrift/master.thrift              |    3 +
 .../ReplicationOperationsImplTest.java          |  416 ------
 .../core/replication/StatusUtilTest.java        |   57 -
 .../core/replication/proto/StatusTest.java      |   36 -
 server/base/pom.xml                             |   29 +
 .../apache/accumulo/server/fs/VolumeUtil.java   |    4 +-
 .../apache/accumulo/server/init/Initialize.java |    3 +-
 .../replication/PrintReplicationRecords.java    |   98 ++
 .../server/replication/ReplicaSystem.java       |   49 +
 .../replication/ReplicaSystemFactory.java       |   82 ++
 .../server/replication/ReplicaSystemHelper.java |   74 ++
 .../server/replication/ReplicationUtil.java     |    6 +-
 .../server/replication/StatusCombiner.java      |    6 +-
 .../server/replication/StatusFormatter.java     |  187 +++
 .../accumulo/server/replication/StatusUtil.java |  216 ++++
 .../server/replication/proto/Replication.java   |  949 ++++++++++++++
 .../server/util/ReplicationTableUtil.java       |    4 +-
 server/base/src/main/protobuf/replication.proto |   26 +
 .../base/src/main/scripts/generate-protobuf.sh  |   98 ++
 .../server/replication/StatusCombinerTest.java  |    6 +-
 .../server/replication/StatusUtilTest.java      |   54 +
 .../server/replication/proto/StatusTest.java    |   36 +
 .../server/util/ReplicationTableUtilTest.java   |    7 +-
 .../gc/GarbageCollectWriteAheadLogs.java        |    4 +-
 .../accumulo/gc/GarbageCollectionAlgorithm.java |    4 +-
 .../gc/GarbageCollectionEnvironment.java        |    2 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |    2 +-
 .../CloseWriteAheadLogReferences.java           |    4 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |    4 +-
 .../accumulo/gc/GarbageCollectionTest.java      |    4 +-
 .../CloseWriteAheadLogReferencesTest.java       |    4 +-
 .../accumulo/master/FateServiceHandler.java     |    2 +-
 .../java/org/apache/accumulo/master/Master.java |   13 +-
 .../master/MasterClientServiceHandler.java      |  112 +-
 .../DistributedWorkQueueWorkAssigner.java       |    4 +-
 .../master/replication/FinishedWorkUpdater.java |    4 +-
 .../RemoveCompleteReplicationRecords.java       |    4 +-
 .../master/replication/StatusMaker.java         |    2 +-
 .../accumulo/master/replication/WorkMaker.java  |    4 +-
 .../master/ReplicationOperationsImplTest.java   |  459 +++++++
 .../replication/FinishedWorkUpdaterTest.java    |    5 +-
 .../RemoveCompleteReplicationRecordsTest.java   |    7 +-
 .../replication/SequentialWorkAssignerTest.java |    5 +-
 .../master/replication/StatusMakerTest.java     |    7 +-
 .../replication/UnorderedWorkAssignerTest.java  |    4 +-
 .../master/replication/WorkMakerTest.java       |    7 +-
 .../tserver/log/TabletServerLogger.java         |    4 +-
 .../replication/AccumuloReplicaSystem.java      |   11 +-
 .../replication/ReplicationProcessor.java       |   10 +-
 .../tserver/tablet/DatafileManager.java         |    2 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |    4 +-
 .../replication/AccumuloReplicaSystemTest.java  |    5 +-
 .../replication/ReplicationProcessorTest.java   |    9 +-
 .../test/replication/MockReplicaSystem.java     |    6 +-
 .../ReplicationTablesPrinterThread.java         |    2 +-
 .../test/replication/CyclicReplicationIT.java   |    2 +-
 ...bageCollectorCommunicatesWithTServersIT.java |    2 +-
 .../replication/MultiInstanceReplicationIT.java |    6 +-
 .../test/replication/ReplicationIT.java         |    8 +-
 .../test/replication/StatusCombinerMacIT.java   |    4 +-
 .../UnorderedWorkAssignerReplicationIT.java     |    6 +-
 ...UnusedWalDoesntCloseReplicationStatusIT.java |    9 +-
 76 files changed, 3804 insertions(+), 2555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index bd41cca..4a3500c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -40,10 +40,6 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-    <dependency>
       <groupId>commons-codec</groupId>
       <artifactId>commons-codec</artifactId>
     </dependency>
@@ -213,29 +209,6 @@
   </build>
   <profiles>
     <profile>
-      <id>protobuf</id>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>exec-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>generate-protobuf</id>
-                <goals>
-                  <goal>exec</goal>
-                </goals>
-                <phase>generate-sources</phase>
-                <configuration>
-                  <executable>${basedir}/src/main/scripts/generate-protobuf.sh</executable>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>thrift</id>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 699d9d5..dfb043c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -23,7 +23,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.replication.PeerExistsException;
 import org.apache.accumulo.core.client.replication.PeerNotFoundException;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 
 /**
  * Supports replication configuration
@@ -33,22 +32,12 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 public interface ReplicationOperations {
 
   /**
-   * Defines a cluster with the given name using the given {@link ReplicaSystem} implementation.
-   *
-   * @param name
-   *          Name of the cluster, used for configuring replication on tables
-   * @param system
-   *          Type of system to be replicated to
-   */
-  public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
-
-  /**
    * Defines a cluster with the given name and the given name system.
    *
    * @param name
    *          Unique name for the cluster
    * @param replicaType
-   *          {@link ReplicaSystem} class name to use to replicate the data
+   *          Class name to use to replicate the data
    */
   public void addPeer(String name, String replicaType) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 6fdf4db..6a5c74a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -32,33 +32,25 @@ import org.apache.accumulo.core.client.admin.ReplicationOperations;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.PeerExistsException;
 import org.apache.accumulo.core.client.replication.PeerNotFoundException;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- *
- */
 public class ReplicationOperationsImpl implements ReplicationOperations {
   private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class);
 
@@ -70,14 +62,6 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
   }
 
   @Override
-  public void addPeer(String name, Class<? extends ReplicaSystem> system) throws AccumuloException, AccumuloSecurityException, PeerExistsException {
-    checkNotNull(name);
-    checkNotNull(system);
-
-    addPeer(name, system.getName());
-  }
-
-  @Override
   public void addPeer(final String name, final String replicaType) throws AccumuloException, AccumuloSecurityException, PeerExistsException {
     checkNotNull(name);
     checkNotNull(replicaType);
@@ -100,95 +84,36 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
   }
 
   @Override
-  public void drain(String tableName, Set<String> wals) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public void drain(final String tableName, final Set<String> wals) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     checkNotNull(tableName);
 
-    Connector conn = context.getConnector();
-    Text tableId = getTableId(conn, tableName);
+    final TInfo tinfo = Tracer.traceInfo();
+    final TCredentials rpcCreds = context.rpcCreds();
 
-    log.info("Waiting for {} to be replicated for {}", wals, tableId);
-
-    log.info("Reading from metadata table");
-    boolean allMetadataRefsReplicated = false;
-    final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange()));
-    while (!allMetadataRefsReplicated) {
-      BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
-      bs.setRanges(range);
-      bs.fetchColumnFamily(ReplicationSection.COLF);
-      try {
-        allMetadataRefsReplicated = allReferencesReplicated(bs, tableId, wals);
-      } finally {
-        bs.close();
-      }
-
-      if (!allMetadataRefsReplicated) {
-        UtilWaitThread.sleep(1000);
-      }
-    }
-
-    log.info("reading from replication table");
-    boolean allReplicationRefsReplicated = false;
-    while (!allReplicationRefsReplicated) {
-      BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
-      bs.setRanges(Collections.singleton(new Range()));
-      try {
-        allReplicationRefsReplicated = allReferencesReplicated(bs, tableId, wals);
-      } finally {
-        bs.close();
-      }
-
-      if (!allReplicationRefsReplicated) {
-        UtilWaitThread.sleep(1000);
-      }
-    }
-  }
-
-  /**
-   * @return return true records are only in place which are fully replicated
-   */
-  protected boolean allReferencesReplicated(BatchScanner bs, Text tableId, Set<String> relevantLogs) {
-    Text rowHolder = new Text(), colfHolder = new Text();
-    for (Entry<Key,Value> entry : bs) {
-      log.info("Got key {}", entry.getKey().toStringNoTruncate());
-
-      entry.getKey().getColumnQualifier(rowHolder);
-      if (tableId.equals(rowHolder)) {
-        entry.getKey().getRow(rowHolder);
-        entry.getKey().getColumnFamily(colfHolder);
-
-        String file;
-        if (colfHolder.equals(ReplicationSection.COLF)) {
-          file = rowHolder.toString();
-          file = file.substring(ReplicationSection.getRowPrefix().length());
-        } else if (colfHolder.equals(OrderSection.NAME)) {
-          file = OrderSection.getFile(entry.getKey(), rowHolder);
-          long timeClosed = OrderSection.getTimeClosed(entry.getKey(), rowHolder);
-          log.debug("Order section: {} and {}", timeClosed, file);
-        } else {
-          file = rowHolder.toString();
-        }
-
-        // Skip files that we didn't observe when we started (new files/data)
-        if (!relevantLogs.contains(file)) {
-          log.debug("Found file that we didn't care about {}", file);
-          continue;
-        } else {
-          log.debug("Found file that we *do* care about {}", file);
-        }
+    // Ask the master if the table is fully replicated given these WALs, but don't poll inside the master
+    boolean drained = false;
+    while (!drained) {
+      drained = getMasterDrain(tinfo, rpcCreds, tableName, wals);
 
+      if (!drained) {
         try {
-          Status stat = Status.parseFrom(entry.getValue().get());
-          if (!StatusUtil.isFullyReplicated(stat)) {
-            log.trace("{} and {} is not fully replicated", entry.getKey().getRow(), ProtobufUtil.toString(stat));
-            return false;
-          }
-        } catch (InvalidProtocolBufferException e) {
-          log.warn("Could not parse protobuf for {}", entry.getKey(), e);
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Thread interrupted", e);
         }
       }
     }
+  }
 
-    return true;
+  protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set<String> wals) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    return MasterClient.execute(context, new ClientExecReturn<Boolean,Client>() {
+      @Override
+      public Boolean execute(Client client) throws Exception {
+        return client.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
+      }
+    });
   }
 
   protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
deleted file mode 100644
index bdcc652..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.replication;
-
-import org.apache.accumulo.core.replication.ReplicaSystemHelper;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Encapsulation of a remote system which Accumulo can replicate data to
- */
-public interface ReplicaSystem {
-
-  /**
-   * Replicate the given status to the target peer
-   *
-   * @param p
-   *          Path to the resource we're reading from
-   * @param status
-   *          Information to replicate
-   * @param target
-   *          The peer
-   * @param helper
-   *          Instance of ReplicaSystemHelper
-   * @return A new Status for the progress that was made
-   */
-  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper);
-
-  /**
-   * Configure the implementation with necessary information from the system configuration
-   * <p>
-   * For example, we only need one implementation for Accumulo, but, for each peer, we have a ZK quorum and instance name
-   */
-  public void configure(String configuration);
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
deleted file mode 100644
index d76b3d8..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.replication;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- *
- */
-public class ReplicaSystemFactory {
-  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
-
-  private ReplicaSystemFactory() {}
-
-  /**
-   * @param value
-   *          {@link ReplicaSystem} implementation class name
-   * @return A {@link ReplicaSystem} object from the given name
-   */
-  public static ReplicaSystem get(String value) {
-    Preconditions.checkNotNull(value);
-
-    int index = value.indexOf(',');
-    if (-1 == index) {
-      throw new IllegalArgumentException("Expected comma separator between replication system name and configuration");
-    }
-
-    String name = value.substring(0, index);
-    String configuration = value.substring(index + 1);
-
-    try {
-      Class<?> clz = Class.forName(name);
-
-      if (ReplicaSystem.class.isAssignableFrom(clz)) {
-        Object o = clz.newInstance();
-        ReplicaSystem rs = (ReplicaSystem) o;
-        rs.configure(configuration);
-        return rs;
-      }
-
-      throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name);
-    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-      log.error("Error creating ReplicaSystem object", e);
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  /**
-   * Generate the configuration value for a {@link ReplicaSystem} in the instance properties
-   *
-   * @param system
-   *          The desired ReplicaSystem to use
-   * @param configuration
-   *          Configuration string for the desired ReplicaSystem
-   * @return Value to set for peer configuration in the instance
-   */
-  public static String getPeerConfigurationValue(Class<? extends ReplicaSystem> system, String configuration) {
-    String systemName = system.getName() + ",";
-    if (null == configuration) {
-      return systemName;
-    }
-
-    return systemName + configuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java b/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
index 9cd1084..63a2131 100644
--- a/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
@@ -86,6 +86,8 @@ import org.slf4j.LoggerFactory;
 
     public org.apache.accumulo.core.security.thrift.TDelegationToken getDelegationToken(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.security.thrift.TDelegationTokenConfig cfg) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
 
+    public boolean drainReplicationTable(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch) throws org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface extends FateService .AsyncIface {
@@ -124,6 +126,8 @@ import org.slf4j.LoggerFactory;
 
     public void getDelegationToken(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.security.thrift.TDelegationTokenConfig cfg, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
+    public void drainReplicationTable(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
   }
 
   public static class Client extends FateService.Client implements Iface {
@@ -587,6 +591,32 @@ import org.slf4j.LoggerFactory;
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getDelegationToken failed: unknown result");
     }
 
+    public boolean drainReplicationTable(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch) throws org.apache.thrift.TException
+    {
+      send_drainReplicationTable(tfino, credentials, tableName, logsToWatch);
+      return recv_drainReplicationTable();
+    }
+
+    public void send_drainReplicationTable(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch) throws org.apache.thrift.TException
+    {
+      drainReplicationTable_args args = new drainReplicationTable_args();
+      args.setTfino(tfino);
+      args.setCredentials(credentials);
+      args.setTableName(tableName);
+      args.setLogsToWatch(logsToWatch);
+      sendBase("drainReplicationTable", args);
+    }
+
+    public boolean recv_drainReplicationTable() throws org.apache.thrift.TException
+    {
+      drainReplicationTable_result result = new drainReplicationTable_result();
+      receiveBase(result, "drainReplicationTable");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "drainReplicationTable failed: unknown result");
+    }
+
   }
   public static class AsyncClient extends FateService.AsyncClient implements AsyncIface {
     public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -1282,6 +1312,47 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public void drainReplicationTable(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      drainReplicationTable_call method_call = new drainReplicationTable_call(tfino, credentials, tableName, logsToWatch, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class drainReplicationTable_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private org.apache.accumulo.core.trace.thrift.TInfo tfino;
+      private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+      private String tableName;
+      private Set<String> logsToWatch;
+      public drainReplicationTable_call(org.apache.accumulo.core.trace.thrift.TInfo tfino, org.apache.accumulo.core.security.thrift.TCredentials credentials, String tableName, Set<String> logsToWatch, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.tfino = tfino;
+        this.credentials = credentials;
+        this.tableName = tableName;
+        this.logsToWatch = logsToWatch;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("drainReplicationTable", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        drainReplicationTable_args args = new drainReplicationTable_args();
+        args.setTfino(tfino);
+        args.setCredentials(credentials);
+        args.setTableName(tableName);
+        args.setLogsToWatch(logsToWatch);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public boolean getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_drainReplicationTable();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends FateService.Processor<I> implements org.apache.thrift.TProcessor {
@@ -1312,6 +1383,7 @@ import org.slf4j.LoggerFactory;
       processMap.put("reportTabletStatus", new reportTabletStatus());
       processMap.put("getActiveTservers", new getActiveTservers());
       processMap.put("getDelegationToken", new getDelegationToken());
+      processMap.put("drainReplicationTable", new drainReplicationTable());
       return processMap;
     }
 
@@ -1722,6 +1794,27 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public static class drainReplicationTable<I extends Iface> extends org.apache.thrift.ProcessFunction<I, drainReplicationTable_args> {
+      public drainReplicationTable() {
+        super("drainReplicationTable");
+      }
+
+      public drainReplicationTable_args getEmptyArgsInstance() {
+        return new drainReplicationTable_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public drainReplicationTable_result getResult(I iface, drainReplicationTable_args args) throws org.apache.thrift.TException {
+        drainReplicationTable_result result = new drainReplicationTable_result();
+        result.success = iface.drainReplicationTable(args.tfino, args.credentials, args.tableName, args.logsToWatch);
+        result.setSuccessIsSet(true);
+        return result;
+      }
+    }
+
   }
 
   public static class AsyncProcessor<I extends AsyncIface> extends FateService.AsyncProcessor<I> {
@@ -1752,6 +1845,7 @@ import org.slf4j.LoggerFactory;
       processMap.put("reportTabletStatus", new reportTabletStatus());
       processMap.put("getActiveTservers", new getActiveTservers());
       processMap.put("getDelegationToken", new getDelegationToken());
+      processMap.put("drainReplicationTable", new drainReplicationTable());
       return processMap;
     }
 
@@ -2680,6 +2774,58 @@ import org.slf4j.LoggerFactory;
       }
     }
 
+    public static class drainReplicationTable<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, drainReplicationTable_args, Boolean> {
+      public drainReplicationTable() {
+        super("drainReplicationTable");
+      }
+
+      public drainReplicationTable_args getEmptyArgsInstance() {
+        return new drainReplicationTable_args();
+      }
+
+      public AsyncMethodCallback<Boolean> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Boolean>() { 
+          public void onComplete(Boolean o) {
+            drainReplicationTable_result result = new drainReplicationTable_result();
+            result.success = o;
+            result.setSuccessIsSet(true);
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            drainReplicationTable_result result = new drainReplicationTable_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, drainReplicationTable_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
+        iface.drainReplicationTable(args.tfino, args.credentials, args.tableName, args.logsToWatch,resultHandler);
+      }
+    }
+
   }
 
   public static class initiateFlush_args implements org.apache.thrift.TBase<initiateFlush_args, initiateFlush_args._Fields>, java.io.Serializable, Cloneable, Comparable<initiateFlush_args>   {
@@ -19723,4 +19869,1071 @@ import org.slf4j.LoggerFactory;
 
   }
 
+  public static class drainReplicationTable_args implements org.apache.thrift.TBase<drainReplicationTable_args, drainReplicationTable_args._Fields>, java.io.Serializable, Cloneable, Comparable<drainReplicationTable_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("drainReplicationTable_args");
+
+    private static final org.apache.thrift.protocol.TField TFINO_FIELD_DESC = new org.apache.thrift.protocol.TField("tfino", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+    private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tableName", org.apache.thrift.protocol.TType.STRING, (short)3);
+    private static final org.apache.thrift.protocol.TField LOGS_TO_WATCH_FIELD_DESC = new org.apache.thrift.protocol.TField("logsToWatch", org.apache.thrift.protocol.TType.SET, (short)4);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new drainReplicationTable_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new drainReplicationTable_argsTupleSchemeFactory());
+    }
+
+    public org.apache.accumulo.core.trace.thrift.TInfo tfino; // required
+    public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
+    public String tableName; // required
+    public Set<String> logsToWatch; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      TFINO((short)1, "tfino"),
+      CREDENTIALS((short)2, "credentials"),
+      TABLE_NAME((short)3, "tableName"),
+      LOGS_TO_WATCH((short)4, "logsToWatch");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // TFINO
+            return TFINO;
+          case 2: // CREDENTIALS
+            return CREDENTIALS;
+          case 3: // TABLE_NAME
+            return TABLE_NAME;
+          case 4: // LOGS_TO_WATCH
+            return LOGS_TO_WATCH;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.TFINO, new org.apache.thrift.meta_data.FieldMetaData("tfino", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.trace.thrift.TInfo.class)));
+      tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
+      tmpMap.put(_Fields.TABLE_NAME, new org.apache.thrift.meta_data.FieldMetaData("tableName", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.LOGS_TO_WATCH, new org.apache.thrift.meta_data.FieldMetaData("logsToWatch", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, 
+              new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(drainReplicationTable_args.class, metaDataMap);
+    }
+
+    public drainReplicationTable_args() {
+    }
+
+    public drainReplicationTable_args(
+      org.apache.accumulo.core.trace.thrift.TInfo tfino,
+      org.apache.accumulo.core.security.thrift.TCredentials credentials,
+      String tableName,
+      Set<String> logsToWatch)
+    {
+      this();
+      this.tfino = tfino;
+      this.credentials = credentials;
+      this.tableName = tableName;
+      this.logsToWatch = logsToWatch;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public drainReplicationTable_args(drainReplicationTable_args other) {
+      if (other.isSetTfino()) {
+        this.tfino = new org.apache.accumulo.core.trace.thrift.TInfo(other.tfino);
+      }
+      if (other.isSetCredentials()) {
+        this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+      }
+      if (other.isSetTableName()) {
+        this.tableName = other.tableName;
+      }
+      if (other.isSetLogsToWatch()) {
+        Set<String> __this__logsToWatch = new HashSet<String>(other.logsToWatch);
+        this.logsToWatch = __this__logsToWatch;
+      }
+    }
+
+    public drainReplicationTable_args deepCopy() {
+      return new drainReplicationTable_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.tfino = null;
+      this.credentials = null;
+      this.tableName = null;
+      this.logsToWatch = null;
+    }
+
+    public org.apache.accumulo.core.trace.thrift.TInfo getTfino() {
+      return this.tfino;
+    }
+
+    public drainReplicationTable_args setTfino(org.apache.accumulo.core.trace.thrift.TInfo tfino) {
+      this.tfino = tfino;
+      return this;
+    }
+
+    public void unsetTfino() {
+      this.tfino = null;
+    }
+
+    /** Returns true if field tfino is set (has been assigned a value) and false otherwise */
+    public boolean isSetTfino() {
+      return this.tfino != null;
+    }
+
+    public void setTfinoIsSet(boolean value) {
+      if (!value) {
+        this.tfino = null;
+      }
+    }
+
+    public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+      return this.credentials;
+    }
+
+    public drainReplicationTable_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+      this.credentials = credentials;
+      return this;
+    }
+
+    public void unsetCredentials() {
+      this.credentials = null;
+    }
+
+    /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+    public boolean isSetCredentials() {
+      return this.credentials != null;
+    }
+
+    public void setCredentialsIsSet(boolean value) {
+      if (!value) {
+        this.credentials = null;
+      }
+    }
+
+    public String getTableName() {
+      return this.tableName;
+    }
+
+    public drainReplicationTable_args setTableName(String tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public void unsetTableName() {
+      this.tableName = null;
+    }
+
+    /** Returns true if field tableName is set (has been assigned a value) and false otherwise */
+    public boolean isSetTableName() {
+      return this.tableName != null;
+    }
+
+    public void setTableNameIsSet(boolean value) {
+      if (!value) {
+        this.tableName = null;
+      }
+    }
+
+    public int getLogsToWatchSize() {
+      return (this.logsToWatch == null) ? 0 : this.logsToWatch.size();
+    }
+
+    public java.util.Iterator<String> getLogsToWatchIterator() {
+      return (this.logsToWatch == null) ? null : this.logsToWatch.iterator();
+    }
+
+    public void addToLogsToWatch(String elem) {
+      if (this.logsToWatch == null) {
+        this.logsToWatch = new HashSet<String>();
+      }
+      this.logsToWatch.add(elem);
+    }
+
+    public Set<String> getLogsToWatch() {
+      return this.logsToWatch;
+    }
+
+    public drainReplicationTable_args setLogsToWatch(Set<String> logsToWatch) {
+      this.logsToWatch = logsToWatch;
+      return this;
+    }
+
+    public void unsetLogsToWatch() {
+      this.logsToWatch = null;
+    }
+
+    /** Returns true if field logsToWatch is set (has been assigned a value) and false otherwise */
+    public boolean isSetLogsToWatch() {
+      return this.logsToWatch != null;
+    }
+
+    public void setLogsToWatchIsSet(boolean value) {
+      if (!value) {
+        this.logsToWatch = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case TFINO:
+        if (value == null) {
+          unsetTfino();
+        } else {
+          setTfino((org.apache.accumulo.core.trace.thrift.TInfo)value);
+        }
+        break;
+
+      case CREDENTIALS:
+        if (value == null) {
+          unsetCredentials();
+        } else {
+          setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+        }
+        break;
+
+      case TABLE_NAME:
+        if (value == null) {
+          unsetTableName();
+        } else {
+          setTableName((String)value);
+        }
+        break;
+
+      case LOGS_TO_WATCH:
+        if (value == null) {
+          unsetLogsToWatch();
+        } else {
+          setLogsToWatch((Set<String>)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case TFINO:
+        return getTfino();
+
+      case CREDENTIALS:
+        return getCredentials();
+
+      case TABLE_NAME:
+        return getTableName();
+
+      case LOGS_TO_WATCH:
+        return getLogsToWatch();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case TFINO:
+        return isSetTfino();
+      case CREDENTIALS:
+        return isSetCredentials();
+      case TABLE_NAME:
+        return isSetTableName();
+      case LOGS_TO_WATCH:
+        return isSetLogsToWatch();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof drainReplicationTable_args)
+        return this.equals((drainReplicationTable_args)that);
+      return false;
+    }
+
+    public boolean equals(drainReplicationTable_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_tfino = true && this.isSetTfino();
+      boolean that_present_tfino = true && that.isSetTfino();
+      if (this_present_tfino || that_present_tfino) {
+        if (!(this_present_tfino && that_present_tfino))
+          return false;
+        if (!this.tfino.equals(that.tfino))
+          return false;
+      }
+
+      boolean this_present_credentials = true && this.isSetCredentials();
+      boolean that_present_credentials = true && that.isSetCredentials();
+      if (this_present_credentials || that_present_credentials) {
+        if (!(this_present_credentials && that_present_credentials))
+          return false;
+        if (!this.credentials.equals(that.credentials))
+          return false;
+      }
+
+      boolean this_present_tableName = true && this.isSetTableName();
+      boolean that_present_tableName = true && that.isSetTableName();
+      if (this_present_tableName || that_present_tableName) {
+        if (!(this_present_tableName && that_present_tableName))
+          return false;
+        if (!this.tableName.equals(that.tableName))
+          return false;
+      }
+
+      boolean this_present_logsToWatch = true && this.isSetLogsToWatch();
+      boolean that_present_logsToWatch = true && that.isSetLogsToWatch();
+      if (this_present_logsToWatch || that_present_logsToWatch) {
+        if (!(this_present_logsToWatch && that_present_logsToWatch))
+          return false;
+        if (!this.logsToWatch.equals(that.logsToWatch))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    @Override
+    public int compareTo(drainReplicationTable_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(isSetTfino()).compareTo(other.isSetTfino());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetTfino()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tfino, other.tfino);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(other.isSetCredentials());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetCredentials()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, other.credentials);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetTableName()).compareTo(other.isSetTableName());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetTableName()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tableName, other.tableName);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(isSetLogsToWatch()).compareTo(other.isSetLogsToWatch());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetLogsToWatch()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.logsToWatch, other.logsToWatch);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("drainReplicationTable_args(");
+      boolean first = true;
+
+      sb.append("tfino:");
+      if (this.tfino == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.tfino);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("credentials:");
+      if (this.credentials == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.credentials);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("tableName:");
+      if (this.tableName == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.tableName);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("logsToWatch:");
+      if (this.logsToWatch == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.logsToWatch);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (tfino != null) {
+        tfino.validate();
+      }
+      if (credentials != null) {
+        credentials.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class drainReplicationTable_argsStandardSchemeFactory implements SchemeFactory {
+      public drainReplicationTable_argsStandardScheme getScheme() {
+        return new drainReplicationTable_argsStandardScheme();
+      }
+    }
+
+    private static class drainReplicationTable_argsStandardScheme extends StandardScheme<drainReplicationTable_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, drainReplicationTable_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // TFINO
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.tfino = new org.apache.accumulo.core.trace.thrift.TInfo();
+                struct.tfino.read(iprot);
+                struct.setTfinoIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // CREDENTIALS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+                struct.credentials.read(iprot);
+                struct.setCredentialsIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 3: // TABLE_NAME
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.tableName = iprot.readString();
+                struct.setTableNameIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 4: // LOGS_TO_WATCH
+              if (schemeField.type == org.apache.thrift.protocol.TType.SET) {
+                {
+                  org.apache.thrift.protocol.TSet _set96 = iprot.readSetBegin();
+                  struct.logsToWatch = new HashSet<String>(2*_set96.size);
+                  for (int _i97 = 0; _i97 < _set96.size; ++_i97)
+                  {
+                    String _elem98;
+                    _elem98 = iprot.readString();
+                    struct.logsToWatch.add(_elem98);
+                  }
+                  iprot.readSetEnd();
+                }
+                struct.setLogsToWatchIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, drainReplicationTable_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.tfino != null) {
+          oprot.writeFieldBegin(TFINO_FIELD_DESC);
+          struct.tfino.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.credentials != null) {
+          oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+          struct.credentials.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        if (struct.tableName != null) {
+          oprot.writeFieldBegin(TABLE_NAME_FIELD_DESC);
+          oprot.writeString(struct.tableName);
+          oprot.writeFieldEnd();
+        }
+        if (struct.logsToWatch != null) {
+          oprot.writeFieldBegin(LOGS_TO_WATCH_FIELD_DESC);
+          {
+            oprot.writeSetBegin(new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, struct.logsToWatch.size()));
+            for (String _iter99 : struct.logsToWatch)
+            {
+              oprot.writeString(_iter99);
+            }
+            oprot.writeSetEnd();
+          }
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class drainReplicationTable_argsTupleSchemeFactory implements SchemeFactory {
+      public drainReplicationTable_argsTupleScheme getScheme() {
+        return new drainReplicationTable_argsTupleScheme();
+      }
+    }
+
+    private static class drainReplicationTable_argsTupleScheme extends TupleScheme<drainReplicationTable_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, drainReplicationTable_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetTfino()) {
+          optionals.set(0);
+        }
+        if (struct.isSetCredentials()) {
+          optionals.set(1);
+        }
+        if (struct.isSetTableName()) {
+          optionals.set(2);
+        }
+        if (struct.isSetLogsToWatch()) {
+          optionals.set(3);
+        }
+        oprot.writeBitSet(optionals, 4);
+        if (struct.isSetTfino()) {
+          struct.tfino.write(oprot);
+        }
+        if (struct.isSetCredentials()) {
+          struct.credentials.write(oprot);
+        }
+        if (struct.isSetTableName()) {
+          oprot.writeString(struct.tableName);
+        }
+        if (struct.isSetLogsToWatch()) {
+          {
+            oprot.writeI32(struct.logsToWatch.size());
+            for (String _iter100 : struct.logsToWatch)
+            {
+              oprot.writeString(_iter100);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, drainReplicationTable_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(4);
+        if (incoming.get(0)) {
+          struct.tfino = new org.apache.accumulo.core.trace.thrift.TInfo();
+          struct.tfino.read(iprot);
+          struct.setTfinoIsSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+          struct.credentials.read(iprot);
+          struct.setCredentialsIsSet(true);
+        }
+        if (incoming.get(2)) {
+          struct.tableName = iprot.readString();
+          struct.setTableNameIsSet(true);
+        }
+        if (incoming.get(3)) {
+          {
+            org.apache.thrift.protocol.TSet _set101 = new org.apache.thrift.protocol.TSet(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+            struct.logsToWatch = new HashSet<String>(2*_set101.size);
+            for (int _i102 = 0; _i102 < _set101.size; ++_i102)
+            {
+              String _elem103;
+              _elem103 = iprot.readString();
+              struct.logsToWatch.add(_elem103);
+            }
+          }
+          struct.setLogsToWatchIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class drainReplicationTable_result implements org.apache.thrift.TBase<drainReplicationTable_result, drainReplicationTable_result._Fields>, java.io.Serializable, Cloneable, Comparable<drainReplicationTable_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("drainReplicationTable_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new drainReplicationTable_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new drainReplicationTable_resultTupleSchemeFactory());
+    }
+
+    public boolean success; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __SUCCESS_ISSET_ID = 0;
+    private byte __isset_bitfield = 0;
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(drainReplicationTable_result.class, metaDataMap);
+    }
+
+    public drainReplicationTable_result() {
+    }
+
+    public drainReplicationTable_result(
+      boolean success)
+    {
+      this();
+      this.success = success;
+      setSuccessIsSet(true);
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public drainReplicationTable_result(drainReplicationTable_result other) {
+      __isset_bitfield = other.__isset_bitfield;
+      this.success = other.success;
+    }
+
+    public drainReplicationTable_result deepCopy() {
+      return new drainReplicationTable_result(this);
+    }
+
+    @Override
+    public void clear() {
+      setSuccessIsSet(false);
+      this.success = false;
+    }
+
+    public boolean isSuccess() {
+      return this.success;
+    }
+
+    public drainReplicationTable_result setSuccess(boolean success) {
+      this.success = success;
+      setSuccessIsSet(true);
+      return this;
+    }
+
+    public void unsetSuccess() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID);
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and false otherwise */
+    public boolean isSetSuccess() {
+      return EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID);
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((Boolean)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return Boolean.valueOf(isSuccess());
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof drainReplicationTable_result)
+        return this.equals((drainReplicationTable_result)that);
+      return false;
+    }
+
+    public boolean equals(drainReplicationTable_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true;
+      boolean that_present_success = true;
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (this.success != that.success)
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    @Override
+    public int compareTo(drainReplicationTable_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("drainReplicationTable_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      sb.append(this.success);
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bitfield = 0;
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class drainReplicationTable_resultStandardSchemeFactory implements SchemeFactory {
+      public drainReplicationTable_resultStandardScheme getScheme() {
+        return new drainReplicationTable_resultStandardScheme();
+      }
+    }
+
+    private static class drainReplicationTable_resultStandardScheme extends StandardScheme<drainReplicationTable_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, drainReplicationTable_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+                struct.success = iprot.readBool();
+                struct.setSuccessIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, drainReplicationTable_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.isSetSuccess()) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          oprot.writeBool(struct.success);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class drainReplicationTable_resultTupleSchemeFactory implements SchemeFactory {
+      public drainReplicationTable_resultTupleScheme getScheme() {
+        return new drainReplicationTable_resultTupleScheme();
+      }
+    }
+
+    private static class drainReplicationTable_resultTupleScheme extends TupleScheme<drainReplicationTable_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, drainReplicationTable_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetSuccess()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetSuccess()) {
+          oprot.writeBool(struct.success);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, drainReplicationTable_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.success = iprot.readBool();
+          struct.setSuccessIsSet(true);
+        }
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
deleted file mode 100644
index 35b6df6..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import java.io.PrintStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- *
- */
-public class PrintReplicationRecords implements Runnable {
-  private static final Logger log = LoggerFactory.getLogger(PrintReplicationRecords.class);
-
-  private Connector conn;
-  private PrintStream out;
-  private SimpleDateFormat sdf;
-
-  public PrintReplicationRecords(Connector conn, PrintStream out) {
-    this.conn = conn;
-    this.out = out;
-    this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-  }
-
-  @Override
-  public void run() {
-    Scanner s;
-
-    out.println(sdf.format(new Date()) + " Replication entries from metadata table");
-    out.println("------------------------------------------------------------------");
-    try {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    } catch (TableNotFoundException e) {
-      log.error("Metadata table does not exist");
-      return;
-    }
-
-    s.setRange(ReplicationSection.getRange());
-    s.fetchColumnFamily(ReplicationSection.COLF);
-    for (Entry<Key,Value> entry : s) {
-      try {
-        out.println(entry.getKey().toStringNoTruncate() + "=" + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
-      } catch (InvalidProtocolBufferException e) {
-        out.println(entry.getKey().toStringNoTruncate() + "= Could not deserialize Status message");
-      }
-    }
-
-    out.println();
-    out.println(sdf.format(new Date()) + " Replication entries from replication table");
-    out.println("--------------------------------------------------------------------");
-
-    try {
-      s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
-    } catch (TableNotFoundException e) {
-      log.error("Replication table does not exist");
-      return;
-    }
-
-    for (Entry<Key,Value> entry : s) {
-      try {
-        out.println(entry.getKey().toStringNoTruncate() + "=" + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
-      } catch (InvalidProtocolBufferException e) {
-        out.println(entry.getKey().toStringNoTruncate() + "= Could not deserialize Status message");
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
deleted file mode 100644
index b27aa43..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class ReplicaSystemHelper {
-  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class);
-
-  private Instance inst;
-  private Credentials creds;
-
-  public ReplicaSystemHelper(Instance inst, Credentials creds) {
-    this.inst = inst;
-    this.creds = creds;
-  }
-
-  /**
-   * Record the updated Status for this file and target
-   *
-   * @param filePath
-   *          Path to file being replicated
-   * @param status
-   *          Updated Status after replication
-   * @param target
-   *          Peer that was replicated to
-   */
-  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException,
-      TableNotFoundException {
-    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    try {
-      log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString(status));
-      Mutation m = new Mutation(filePath.toString());
-      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
-      bw.addMutation(m);
-    } finally {
-      bw.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
index c6f8ada..f011de0 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -54,7 +54,6 @@ public class ReplicationTable {
   public static final String WORK_LG_NAME = WorkSection.NAME.toString();
   public static final Set<Text> WORK_LG_COLFAMS = Collections.singleton(WorkSection.NAME);
   public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS);
-  public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName();
 
   public static Scanner getScanner(Connector conn) throws ReplicationTableOfflineException {
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
deleted file mode 100644
index bc04480..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import java.text.DateFormat;
-import java.text.FieldPosition;
-import java.text.ParsePosition;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.format.DefaultFormatter;
-import org.apache.accumulo.core.util.format.Formatter;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * Parse and print the serialized protocol buffers used to track replication data
- */
-public class StatusFormatter implements Formatter {
-  private static final Logger log = LoggerFactory.getLogger(StatusFormatter.class);
-
-  private static final Set<Text> REPLICATION_COLFAMS = Collections.unmodifiableSet(Sets.newHashSet(ReplicationSection.COLF, StatusSection.NAME,
-      WorkSection.NAME, OrderSection.NAME));
-
-  private Iterator<Entry<Key,Value>> iterator;
-  private boolean printTimestamps;
-
-  /* so a new date object doesn't get created for every record in the scan result */
-  private static ThreadLocal<Date> tmpDate = new ThreadLocal<Date>() {
-    @Override
-    protected Date initialValue() {
-      return new Date();
-    }
-  };
-
-  private static final ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
-    @Override
-    protected DateFormat initialValue() {
-      return new DefaultDateFormat();
-    }
-
-    class DefaultDateFormat extends DateFormat {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public StringBuffer format(Date date, StringBuffer toAppendTo, FieldPosition fieldPosition) {
-        toAppendTo.append(Long.toString(date.getTime()));
-        return toAppendTo;
-      }
-
-      @Override
-      public Date parse(String source, ParsePosition pos) {
-        return new Date(Long.parseLong(source));
-      }
-
-    }
-  };
-
-  @Override
-  public boolean hasNext() {
-    return iterator.hasNext();
-  }
-
-  @Override
-  public String next() {
-    Entry<Key,Value> entry = iterator.next();
-    DateFormat timestampFormat = printTimestamps ? formatter.get() : null;
-
-    // If we expected this to be a protobuf, try to parse it, adding a message when it fails to parse
-    if (REPLICATION_COLFAMS.contains(entry.getKey().getColumnFamily())) {
-      Status status;
-      try {
-        status = Status.parseFrom(entry.getValue().get());
-      } catch (InvalidProtocolBufferException e) {
-        log.trace("Could not deserialize protocol buffer for {}", entry.getKey(), e);
-        status = null;
-      }
-
-      return formatEntry(entry.getKey(), status, timestampFormat);
-    } else {
-      // Otherwise, we're set on a table that contains other data too (e.g. accumulo.metadata)
-      // Just do the normal thing
-      return DefaultFormatter.formatEntry(entry, timestampFormat);
-    }
-  }
-
-  public String formatEntry(Key key, Status status, DateFormat timestampFormat) {
-    StringBuilder sb = new StringBuilder();
-    Text buffer = new Text();
-
-    // append row
-    key.getRow(buffer);
-    appendText(sb, buffer).append(" ");
-
-    // append column family
-    key.getColumnFamily(buffer);
-    appendText(sb, buffer).append(":");
-
-    // append column qualifier
-    key.getColumnQualifier(buffer);
-    appendText(sb, buffer).append(" ");
-
-    // append visibility expression
-    key.getColumnVisibility(buffer);
-    sb.append(new ColumnVisibility(buffer));
-
-    // append timestamp
-    if (timestampFormat != null) {
-      tmpDate.get().setTime(key.getTimestamp());
-      sb.append(" ").append(timestampFormat.format(tmpDate.get()));
-    }
-
-    sb.append("\t");
-    // append value
-    if (status != null) {
-      sb.append(ProtobufUtil.toString(status));
-    } else {
-      sb.append("Could not deserialize Status protocol buffer");
-    }
-
-    return sb.toString();
-  }
-
-  protected StringBuilder appendText(StringBuilder sb, Text t) {
-    return appendBytes(sb, t.getBytes(), 0, t.getLength());
-  }
-
-  protected String getValue(Value v) {
-    StringBuilder sb = new StringBuilder();
-    return appendBytes(sb, v.get(), 0, v.get().length).toString();
-  }
-
-  protected StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) {
-    for (int i = 0; i < len; i++) {
-      int c = 0xff & ba[offset + i];
-      if (c == '\\')
-        sb.append("\\\\");
-      else if (c >= 32 && c <= 126)
-        sb.append((char) c);
-      else
-        sb.append("\\x").append(String.format("%02X", c));
-    }
-    return sb;
-  }
-
-  @Override
-  public void remove() {
-    iterator.remove();
-  }
-
-  @Override
-  public void initialize(Iterable<Entry<Key,Value>> scanner, boolean printTimestamps) {
-    this.iterator = scanner.iterator();
-    this.printTimestamps = printTimestamps;
-  }
-
-}


Mime
View raw message