accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/4] accumulo git commit: ACCUMULO-3650 Push protobuf-java dependency into server-base.
Date Wed, 11 Mar 2015 21:36:49 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
index e4c90a4..0774040 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.apache.accumulo.core.client.lexicoder.AbstractEncoder;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -29,10 +28,11 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.TypedValueCombiner;
 import org.apache.accumulo.core.iterators.ValueFormatException;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.replication.proto.Replication.Status.Builder;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.proto.Replication.Status.Builder;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/StatusFormatter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusFormatter.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusFormatter.java
new file mode 100644
index 0000000..71c82b9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusFormatter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import java.text.DateFormat;
+import java.text.FieldPosition;
+import java.text.ParsePosition;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.accumulo.core.util.format.Formatter;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Parse and print the serialized protocol buffers used to track replication data
+ */
+public class StatusFormatter implements Formatter {
+  private static final Logger log = LoggerFactory.getLogger(StatusFormatter.class);
+
+  private static final Set<Text> REPLICATION_COLFAMS = Collections.unmodifiableSet(Sets.newHashSet(ReplicationSection.COLF, StatusSection.NAME,
+      WorkSection.NAME, OrderSection.NAME));
+
+  private Iterator<Entry<Key,Value>> iterator;
+  private boolean printTimestamps;
+
+  /* so a new date object doesn't get created for every record in the scan result */
+  private static ThreadLocal<Date> tmpDate = new ThreadLocal<Date>() {
+    @Override
+    protected Date initialValue() {
+      return new Date();
+    }
+  };
+
+  private static final ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
+    @Override
+    protected DateFormat initialValue() {
+      return new DefaultDateFormat();
+    }
+
+    class DefaultDateFormat extends DateFormat {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public StringBuffer format(Date date, StringBuffer toAppendTo, FieldPosition fieldPosition) {
+        toAppendTo.append(Long.toString(date.getTime()));
+        return toAppendTo;
+      }
+
+      @Override
+      public Date parse(String source, ParsePosition pos) {
+        return new Date(Long.parseLong(source));
+      }
+
+    }
+  };
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public String next() {
+    Entry<Key,Value> entry = iterator.next();
+    DateFormat timestampFormat = printTimestamps ? formatter.get() : null;
+
+    // If we expected this to be a protobuf, try to parse it, adding a message when it fails to parse
+    if (REPLICATION_COLFAMS.contains(entry.getKey().getColumnFamily())) {
+      Status status;
+      try {
+        status = Status.parseFrom(entry.getValue().get());
+      } catch (InvalidProtocolBufferException e) {
+        log.trace("Could not deserialize protocol buffer for {}", entry.getKey(), e);
+        status = null;
+      }
+
+      return formatEntry(entry.getKey(), status, timestampFormat);
+    } else {
+      // Otherwise, we're set on a table that contains other data too (e.g. accumulo.metadata)
+      // Just do the normal thing
+      return DefaultFormatter.formatEntry(entry, timestampFormat);
+    }
+  }
+
+  public String formatEntry(Key key, Status status, DateFormat timestampFormat) {
+    StringBuilder sb = new StringBuilder();
+    Text buffer = new Text();
+
+    // append row
+    key.getRow(buffer);
+    appendText(sb, buffer).append(" ");
+
+    // append column family
+    key.getColumnFamily(buffer);
+    appendText(sb, buffer).append(":");
+
+    // append column qualifier
+    key.getColumnQualifier(buffer);
+    appendText(sb, buffer).append(" ");
+
+    // append visibility expression
+    key.getColumnVisibility(buffer);
+    sb.append(new ColumnVisibility(buffer));
+
+    // append timestamp
+    if (timestampFormat != null) {
+      tmpDate.get().setTime(key.getTimestamp());
+      sb.append(" ").append(timestampFormat.format(tmpDate.get()));
+    }
+
+    sb.append("\t");
+    // append value
+    if (status != null) {
+      sb.append(ProtobufUtil.toString(status));
+    } else {
+      sb.append("Could not deserialize Status protocol buffer");
+    }
+
+    return sb.toString();
+  }
+
+  protected StringBuilder appendText(StringBuilder sb, Text t) {
+    return appendBytes(sb, t.getBytes(), 0, t.getLength());
+  }
+
+  protected String getValue(Value v) {
+    StringBuilder sb = new StringBuilder();
+    return appendBytes(sb, v.get(), 0, v.get().length).toString();
+  }
+
+  protected StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) {
+    for (int i = 0; i < len; i++) {
+      int c = 0xff & ba[offset + i];
+      if (c == '\\')
+        sb.append("\\\\");
+      else if (c >= 32 && c <= 126)
+        sb.append((char) c);
+      else
+        sb.append("\\x").append(String.format("%02X", c));
+    }
+    return sb;
+  }
+
+  @Override
+  public void remove() {
+    iterator.remove();
+  }
+
+  @Override
+  public void initialize(Iterable<Entry<Key,Value>> scanner, boolean printTimestamps) {
+    this.iterator = scanner.iterator();
+    this.printTimestamps = printTimestamps;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
new file mode 100644
index 0000000..898e3d4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.proto.Replication.Status.Builder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Helper methods to create Status protobuf messages
+ */
+public class StatusUtil {
+
+  private static final Status INF_END_REPLICATION_STATUS, CLOSED_STATUS;
+  private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
+
+  private static final Status.Builder CREATED_STATUS_BUILDER;
+
+  static {
+    CREATED_STATUS_BUILDER = Status.newBuilder();
+    CREATED_STATUS_BUILDER.setBegin(0);
+    CREATED_STATUS_BUILDER.setEnd(0);
+    CREATED_STATUS_BUILDER.setInfiniteEnd(false);
+    CREATED_STATUS_BUILDER.setClosed(false);
+
+    Builder builder = Status.newBuilder();
+    builder.setBegin(0);
+    builder.setEnd(0);
+    builder.setInfiniteEnd(true);
+    builder.setClosed(false);
+    INF_END_REPLICATION_STATUS = builder.build();
+    INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
+
+    builder = Status.newBuilder();
+    builder.setBegin(0);
+    builder.setEnd(0);
+    builder.setInfiniteEnd(true);
+    builder.setClosed(true);
+    CLOSED_STATUS = builder.build();
+    CLOSED_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_STATUS);
+  }
+
+  /**
+   * Creates a {@link Status} for newly-created data that must be replicated
+   *
+   * @param recordsIngested
+   *          Offset of records which need to be replicated
+   * @return A {@link Status} tracking data that must be replicated
+   */
+  public static Status ingestedUntil(long recordsIngested) {
+    return ingestedUntil(Status.newBuilder(), recordsIngested);
+  }
+
+  public static Status ingestedUntil(Builder builder, long recordsIngested) {
+    return replicatedAndIngested(builder, 0, recordsIngested);
+  }
+
+  /**
+   * @param recordsReplicated
+   *          Offset of records which have been replicated
+   * @return A {@link Status} tracking data that must be replicated
+   */
+  public static Status replicated(long recordsReplicated) {
+    return replicated(Status.newBuilder(), recordsReplicated);
+  }
+
+  /**
+   * @param builder
+   *          Existing {@link Builder} to use
+   * @param recordsReplicated
+   *          Offset of records which have been replicated
+   * @returnA {@link Status} tracking data that must be replicated
+   */
+  public static Status replicated(Status.Builder builder, long recordsReplicated) {
+    return replicatedAndIngested(builder, recordsReplicated, 0);
+  }
+
+  /**
+   * Creates a @{link Status} for a file which has new data and data which has been replicated
+   *
+   * @param recordsReplicated
+   *          Offset of records which have been replicated
+   * @param recordsIngested
+   *          Offset for records which need to be replicated
+   * @return A {@link Status} for the given parameters
+   */
+  public static Status replicatedAndIngested(long recordsReplicated, long recordsIngested) {
+    return replicatedAndIngested(Status.newBuilder(), recordsReplicated, recordsIngested);
+  }
+
+  /**
+   * Same as {@link #replicatedAndIngested(long, long)} but uses the provided {@link Builder}
+   *
+   * @param builder
+   *          An existing builder
+   * @param recordsReplicated
+   *          Offset of records which have been replicated
+   * @param recordsIngested
+   *          Offset of records which need to be replicated
+   * @return A {@link Status} for the given parameters using the builder
+   */
+  public static Status replicatedAndIngested(Status.Builder builder, long recordsReplicated, long recordsIngested) {
+    return builder.setBegin(recordsReplicated).setEnd(recordsIngested).setClosed(false).setInfiniteEnd(false).build();
+  }
+
+  /**
+   * @return A {@link Status} for a new file that was just created
+   */
+  public static synchronized Status fileCreated(long timeCreated) {
+    // We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable)
+    CREATED_STATUS_BUILDER.setCreatedTime(timeCreated);
+    return CREATED_STATUS_BUILDER.build();
+  }
+
+  /**
+   * @return A {@link Value} for a new file that was just created
+   */
+  public static Value fileCreatedValue(long timeCreated) {
+    return ProtobufUtil.toValue(fileCreated(timeCreated));
+  }
+
+  /**
+   * @return A Status representing a closed file
+   */
+  public static Status fileClosed() {
+    return CLOSED_STATUS;
+  }
+
+  /**
+   * @return A Value representing a closed file
+   */
+  public static Value fileClosedValue() {
+    return CLOSED_STATUS_VALUE;
+  }
+
+  /**
+   * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+   */
+  public static Status openWithUnknownLength() {
+    return INF_END_REPLICATION_STATUS;
+  }
+
+  /**
+   * @return A {@link Value} for an open file of unspecified length, all of which needs replicating.
+   */
+  public static Value openWithUnknownLengthValue() {
+    return INF_END_REPLICATION_STATUS_VALUE;
+  }
+
+  /**
+   * @param v
+   *          Value with serialized Status
+   * @return A Status created from the Value
+   */
+  public static Status fromValue(Value v) throws InvalidProtocolBufferException {
+    return Status.parseFrom(v.get());
+  }
+
+  /**
+   * Is the given Status fully replicated and is its file ready for deletion on the source
+   *
+   * @param status
+   *          a Status protobuf
+   * @return True if the file this Status references can be deleted.
+   */
+  public static boolean isSafeForRemoval(Status status) {
+    return status.getClosed() && isFullyReplicated(status);
+  }
+
+  /**
+   * Is the given Status fully replicated but potentially not yet safe for deletion
+   *
+   * @param status
+   *          a Status protobuf
+   * @return True if the file this Status references is fully replicated so far
+   */
+  public static boolean isFullyReplicated(Status status) {
+    if (status.getInfiniteEnd()) {
+      return Long.MAX_VALUE == status.getBegin();
+    } else {
+      return status.getBegin() >= status.getEnd();
+    }
+  }
+
+  /**
+   * Given the {@link Status}, is there replication work to be done
+   *
+   * @param status
+   *          Status for a file
+   * @return true if replication work is required
+   */
+  public static boolean isWorkRequired(Status status) {
+    if (status.getInfiniteEnd()) {
+      return Long.MAX_VALUE != status.getBegin();
+    } else {
+      return status.getBegin() < status.getEnd();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/proto/Replication.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/proto/Replication.java b/server/base/src/main/java/org/apache/accumulo/server/replication/proto/Replication.java
new file mode 100644
index 0000000..b36958d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/proto/Replication.java
@@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/main/protobuf/replication.proto
+
+package org.apache.accumulo.server.replication.proto;
+
+@SuppressWarnings({"unused"}) public final class Replication {
+  private Replication() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface StatusOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 begin = 1 [default = 0];
+    /**
+     * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
+     */
+    boolean hasBegin();
+    /**
+     * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
+     */
+    long getBegin();
+
+    // optional int64 end = 2 [default = 0];
+    /**
+     * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
+     */
+    boolean hasEnd();
+    /**
+     * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
+     */
+    long getEnd();
+
+    // optional bool infiniteEnd = 3 [default = false];
+    /**
+     * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
+     */
+    boolean hasInfiniteEnd();
+    /**
+     * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
+     */
+    boolean getInfiniteEnd();
+
+    // optional bool closed = 4 [default = false];
+    /**
+     * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
+     */
+    boolean hasClosed();
+    /**
+     * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
+     */
+    boolean getClosed();
+
+    // optional int64 createdTime = 5 [default = 0];
+    /**
+     * <code>optional int64 createdTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file created?
+     * </pre>
+     */
+    boolean hasCreatedTime();
+    /**
+     * <code>optional int64 createdTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file created?
+     * </pre>
+     */
+    long getCreatedTime();
+  }
+  /**
+   * Protobuf type {@code Status}
+   */
+  public static final class Status extends
+      com.google.protobuf.GeneratedMessage
+      implements StatusOrBuilder {
+    // Use Status.newBuilder() to construct.
+    private Status(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private Status(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final Status defaultInstance;
+    public static Status getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public Status getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private Status(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              begin_ = input.readInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              end_ = input.readInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              infiniteEnd_ = input.readBool();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              closed_ = input.readBool();
+              break;
+            }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              createdTime_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.accumulo.server.replication.proto.Replication.internal_static_Status_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.accumulo.server.replication.proto.Replication.internal_static_Status_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.accumulo.server.replication.proto.Replication.Status.class, org.apache.accumulo.server.replication.proto.Replication.Status.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<Status> PARSER =
+        new com.google.protobuf.AbstractParser<Status>() {
+      public Status parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new Status(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<Status> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional int64 begin = 1 [default = 0];
+    public static final int BEGIN_FIELD_NUMBER = 1;
+    private long begin_;
+    /**
+     * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
+     */
+    public boolean hasBegin() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
+     */
+    public long getBegin() {
+      return begin_;
+    }
+
+    // optional int64 end = 2 [default = 0];
+    public static final int END_FIELD_NUMBER = 2;
+    private long end_;
+    /**
+     * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
+     */
+    public boolean hasEnd() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
+     */
+    public long getEnd() {
+      return end_;
+    }
+
+    // optional bool infiniteEnd = 3 [default = false];
+    public static final int INFINITEEND_FIELD_NUMBER = 3;
+    private boolean infiniteEnd_;
+    /**
+     * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
+     */
+    public boolean hasInfiniteEnd() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
+     */
+    public boolean getInfiniteEnd() {
+      return infiniteEnd_;
+    }
+
+    // optional bool closed = 4 [default = false];
+    public static final int CLOSED_FIELD_NUMBER = 4;
+    private boolean closed_;
+    /**
+     * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
+     */
+    public boolean hasClosed() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
+     */
+    public boolean getClosed() {
+      return closed_;
+    }
+
+    // optional int64 createdTime = 5 [default = 0];
+    public static final int CREATEDTIME_FIELD_NUMBER = 5;
+    private long createdTime_;
+    /**
+     * <code>optional int64 createdTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file created?
+     * </pre>
+     */
+    public boolean hasCreatedTime() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int64 createdTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file created?
+     * </pre>
+     */
+    public long getCreatedTime() {
+      return createdTime_;
+    }
+
+    private void initFields() {
+      begin_ = 0L;
+      end_ = 0L;
+      infiniteEnd_ = false;
+      closed_ = false;
+      createdTime_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, begin_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt64(2, end_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(3, infiniteEnd_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBool(4, closed_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(5, createdTime_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, begin_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, end_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(3, infiniteEnd_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, closed_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(5, createdTime_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.accumulo.server.replication.proto.Replication.Status parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.accumulo.server.replication.proto.Replication.Status prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code Status}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.accumulo.server.replication.proto.Replication.StatusOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.accumulo.server.replication.proto.Replication.internal_static_Status_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.accumulo.server.replication.proto.Replication.internal_static_Status_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.accumulo.server.replication.proto.Replication.Status.class, org.apache.accumulo.server.replication.proto.Replication.Status.Builder.class);
+      }
+
+      // Construct using org.apache.accumulo.server.replication.proto.Replication.Status.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        begin_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        end_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        infiniteEnd_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        closed_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        createdTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.accumulo.server.replication.proto.Replication.internal_static_Status_descriptor;
+      }
+
+      public org.apache.accumulo.server.replication.proto.Replication.Status getDefaultInstanceForType() {
+        return org.apache.accumulo.server.replication.proto.Replication.Status.getDefaultInstance();
+      }
+
+      public org.apache.accumulo.server.replication.proto.Replication.Status build() {
+        org.apache.accumulo.server.replication.proto.Replication.Status result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.accumulo.server.replication.proto.Replication.Status buildPartial() {
+        org.apache.accumulo.server.replication.proto.Replication.Status result = new org.apache.accumulo.server.replication.proto.Replication.Status(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.begin_ = begin_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.end_ = end_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.infiniteEnd_ = infiniteEnd_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.closed_ = closed_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.createdTime_ = createdTime_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.accumulo.server.replication.proto.Replication.Status) {
+          return mergeFrom((org.apache.accumulo.server.replication.proto.Replication.Status)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.accumulo.server.replication.proto.Replication.Status other) {
+        if (other == org.apache.accumulo.server.replication.proto.Replication.Status.getDefaultInstance()) return this;
+        if (other.hasBegin()) {
+          setBegin(other.getBegin());
+        }
+        if (other.hasEnd()) {
+          setEnd(other.getEnd());
+        }
+        if (other.hasInfiniteEnd()) {
+          setInfiniteEnd(other.getInfiniteEnd());
+        }
+        if (other.hasClosed()) {
+          setClosed(other.getClosed());
+        }
+        if (other.hasCreatedTime()) {
+          setCreatedTime(other.getCreatedTime());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.accumulo.server.replication.proto.Replication.Status parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.accumulo.server.replication.proto.Replication.Status) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional int64 begin = 1 [default = 0];
+      private long begin_ ;
+      /**
+       * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
+       */
+      public boolean hasBegin() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
+       */
+      public long getBegin() {
+        return begin_;
+      }
+      /**
+       * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
+       */
+      public Builder setBegin(long value) {
+        bitField0_ |= 0x00000001;
+        begin_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
+       */
+      public Builder clearBegin() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        begin_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 end = 2 [default = 0];
+      private long end_ ;
+      /**
+       * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
+       */
+      public boolean hasEnd() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
+       */
+      public long getEnd() {
+        return end_;
+      }
+      /**
+       * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
+       */
+      public Builder setEnd(long value) {
+        bitField0_ |= 0x00000002;
+        end_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
+       */
+      public Builder clearEnd() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        end_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bool infiniteEnd = 3 [default = false];
+      private boolean infiniteEnd_ ;
+      /**
+       * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
+       */
+      public boolean hasInfiniteEnd() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
+       */
+      public boolean getInfiniteEnd() {
+        return infiniteEnd_;
+      }
+      /**
+       * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
+       */
+      public Builder setInfiniteEnd(boolean value) {
+        bitField0_ |= 0x00000004;
+        infiniteEnd_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
+       */
+      public Builder clearInfiniteEnd() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        infiniteEnd_ = false;
+        onChanged();
+        return this;
+      }
+
+      // optional bool closed = 4 [default = false];
+      private boolean closed_ ;
+      /**
+       * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
+       */
+      public boolean hasClosed() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
+       */
+      public boolean getClosed() {
+        return closed_;
+      }
+      /**
+       * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
+       */
+      public Builder setClosed(boolean value) {
+        bitField0_ |= 0x00000008;
+        closed_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
+       */
+      public Builder clearClosed() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        closed_ = false;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 createdTime = 5 [default = 0];
+      private long createdTime_ ;
+      /**
+       * <code>optional int64 createdTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file created?
+       * </pre>
+       */
+      public boolean hasCreatedTime() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional int64 createdTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file created?
+       * </pre>
+       */
+      public long getCreatedTime() {
+        return createdTime_;
+      }
+      /**
+       * <code>optional int64 createdTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file created?
+       * </pre>
+       */
+      public Builder setCreatedTime(long value) {
+        bitField0_ |= 0x00000010;
+        createdTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 createdTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file created?
+       * </pre>
+       */
+      public Builder clearCreatedTime() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        createdTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:Status)
+    }
+
+    static {
+      defaultInstance = new Status(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:Status)
+  }
+
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_Status_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_Status_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n#src/main/protobuf/replication.proto\"u\n" +
+      "\006Status\022\020\n\005begin\030\001 \001(\003:\0010\022\016\n\003end\030\002 \001(\003:\001" +
+      "0\022\032\n\013infiniteEnd\030\003 \001(\010:\005false\022\025\n\006closed\030" +
+      "\004 \001(\010:\005false\022\026\n\013createdTime\030\005 \001(\003:\0010B0\n," +
+      "org.apache.accumulo.server.replication.p" +
+      "rotoH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          internal_static_Status_descriptor =
+            getDescriptor().getMessageTypes().get(0);
+          internal_static_Status_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_Status_descriptor,
+              new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", "CreatedTime", });
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 344e245..af02a8d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -42,12 +42,12 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusFormatter;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.replication.StatusCombiner;
+import org.apache.accumulo.server.replication.StatusFormatter;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/protobuf/replication.proto
----------------------------------------------------------------------
diff --git a/server/base/src/main/protobuf/replication.proto b/server/base/src/main/protobuf/replication.proto
new file mode 100644
index 0000000..96e4a1a
--- /dev/null
+++ b/server/base/src/main/protobuf/replication.proto
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+option java_package = "org.apache.accumulo.server.replication.proto";
+option optimize_for = SPEED;
+
+message Status {
+	optional int64 begin = 1 [default = 0]; // offset where replication should start
+	optional int64 end = 2 [default = 0]; // offset where data is ready for replication
+	optional bool infiniteEnd = 3 [default = false]; // do we have a discrete 'end'
+	optional bool closed = 4 [default = false]; // will more data be appended to the file
+	optional int64 createdTime = 5 [default = 0]; // when, in ms, was the file created?
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/scripts/generate-protobuf.sh
----------------------------------------------------------------------
diff --git a/server/base/src/main/scripts/generate-protobuf.sh b/server/base/src/main/scripts/generate-protobuf.sh
new file mode 100755
index 0000000..3fc70a1
--- /dev/null
+++ b/server/base/src/main/scripts/generate-protobuf.sh
@@ -0,0 +1,98 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script will regenerate the protobuf code for Accumulo
+
+# NOTES:
+#   To support this script being called by other modules, only edit the right side.
+#   In other scripts, set the variables that diverge from the defaults below, then call this script.
+#   Leave the BUILD_DIR and FINAL_DIR alone for Maven builds.
+# ========================================================================================================================
+[[ -z $REQUIRED_PROTOC_VERSION ]] && REQUIRED_PROTOC_VERSION='libprotoc 2.5.0'
+[[ -z $BUILD_DIR ]]               && BUILD_DIR='target/proto'
+[[ -z $FINAL_DIR ]]               && FINAL_DIR='src/main'
+# ========================================================================================================================
+
+fail() {
+  echo "$@"
+  exit 1
+}
+
+# Test to see if we have thrift installed
+VERSION=$(protoc --version 2>/dev/null | grep -F "${REQUIRED_PROTOC_VERSION}" |  wc -l)
+if [[ $VERSION -ne 1 ]] ; then
+  # Nope: bail
+  echo "****************************************************"
+  echo "*** protoc is not available"
+  echo "***   expecting 'protoc --version' to return ${REQUIRED_PROTOC_VERSION}"
+  echo "*** generated code will not be updated"
+  fail "****************************************************"
+fi
+
+# Ensure output directories are created
+PROTOC_ARGS="--java_out=$BUILD_DIR"
+rm -rf $BUILD_DIR
+mkdir -p $BUILD_DIR
+
+protoc ${PROTOC_ARGS} src/main/protobuf/*.proto || fail unable to generate Java protocol buffer classes
+
+# For all generated thrift code, suppress all warnings and add the LICENSE header
+s='@SuppressWarnings({"unused"})'
+find $BUILD_DIR -name '*.java' -print0 | xargs -0 sed -i.orig -e 's/\(public final class \)/'"$s"' \1/'
+
+PREFIX="/*
+"
+LINE_NOTATION=" *"
+SUFFIX="
+ */"
+FILE_SUFFIX=(.java)
+
+for file in "${FILE_SUFFIX[@]}"; do
+  for f in $(find $BUILD_DIR/ -name "*$file"); do
+    cat - "$f" > "${f}-with-license" <<EOF
+${PREFIX}${LINE_NOTATION} Licensed to the Apache Software Foundation (ASF) under one or more
+${LINE_NOTATION} contributor license agreements.  See the NOTICE file distributed with
+${LINE_NOTATION} this work for additional information regarding copyright ownership.
+${LINE_NOTATION} The ASF licenses this file to You under the Apache License, Version 2.0
+${LINE_NOTATION} (the "License"); you may not use this file except in compliance with
+${LINE_NOTATION} the License.  You may obtain a copy of the License at
+${LINE_NOTATION}
+${LINE_NOTATION}     http://www.apache.org/licenses/LICENSE-2.0
+${LINE_NOTATION}
+${LINE_NOTATION} Unless required by applicable law or agreed to in writing, software
+${LINE_NOTATION} distributed under the License is distributed on an "AS IS" BASIS,
+${LINE_NOTATION} WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+${LINE_NOTATION} See the License for the specific language governing permissions and
+${LINE_NOTATION} limitations under the License.${SUFFIX}
+EOF
+  done
+done
+
+# For every generated java file, compare it with the version-controlled one, and copy the ones that have changed into place
+SDIR="${BUILD_DIR}/org/apache/accumulo/server/replication/proto"
+DDIR="${FINAL_DIR}/java/org/apache/accumulo/server/replication/proto"
+FILE_SUFFIX=(.java)
+mkdir -p "$DDIR"
+for file in "${FILE_SUFFIX[@]}"; do
+  for f in $(find $SDIR -name *$file); do
+    DEST=$DDIR/$(basename "$f")
+    if ! cmp -s "${f}-with-license" "${DEST}" ; then
+      echo cp -f "${f}-with-license" "${DEST}"
+      cp -f "${f}-with-license" "${DEST}" || fail unable to copy files to java workspace
+    fi
+  done
+done

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index 7428a19..f4d5a9b 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -33,16 +33,12 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- *
- */
 public class StatusCombinerTest {
 
   private StatusCombiner combiner;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/test/java/org/apache/accumulo/server/replication/StatusUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusUtilTest.java
new file mode 100644
index 0000000..8478395
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusUtilTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StatusUtilTest {
+
+  @Test
+  public void newFileIsNotCompletelyReplicated() {
+    Assert.assertFalse(StatusUtil.isSafeForRemoval(StatusUtil.fileCreated(0l)));
+  }
+
+  @Test
+  public void openFileIsNotCompletelyReplicated() {
+    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(false).setBegin(0).setEnd(1000).setInfiniteEnd(false).build()));
+  }
+
+  @Test
+  public void closedFileWithDifferentBeginEndIsNotCompletelyReplicated() {
+    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setBegin(0).setEnd(1000).setInfiniteEnd(false).build()));
+  }
+
+  @Test
+  public void closedFileWithInfEndAndNonMaxBeginIsNotCompletelyReplicated() {
+    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setInfiniteEnd(true).setBegin(10000).build()));
+  }
+
+  @Test
+  public void closedFileWithInfEndAndMaxBeginIsCompletelyReplicated() {
+    Assert.assertTrue(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setInfiniteEnd(true).setBegin(Long.MAX_VALUE).build()));
+  }
+
+  @Test
+  public void closeFileWithEqualBeginEndIsCompletelyReplicated() {
+    Assert.assertTrue(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setEnd(100000).setBegin(100000).build()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/test/java/org/apache/accumulo/server/replication/proto/StatusTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/proto/StatusTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/proto/StatusTest.java
new file mode 100644
index 0000000..b2ce5ef
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/proto/StatusTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication.proto;
+
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class StatusTest {
+
+  @Test
+  public void equality() {
+    Status replicated = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
+    Status unreplicated = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
+
+    Assert.assertFalse(replicated.equals(unreplicated));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 355fa42..b1010c2 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -50,10 +50,10 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.replication.StatusCombiner;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
@@ -61,9 +61,6 @@ import org.easymock.IAnswer;
 import org.junit.Assert;
 import org.junit.Test;
 
-/**
- *
- */
 public class ReplicationTableUtilTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 35c60d6..1735c0d 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -46,8 +46,6 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -61,6 +59,8 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index f375328..672c7ff 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -38,11 +38,11 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
index 41ac204..540d0c1 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
@@ -33,7 +33,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 35005d8..81d8a55 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -62,7 +62,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Sc
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.trace.CountSampler;
@@ -90,6 +89,7 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.rpc.RpcWrapper;
 import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
 import org.apache.accumulo.server.rpc.TServerUtils;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 9b60c88..3a32727 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -41,8 +41,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -52,6 +50,8 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 4c8ed70..5801faa 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -55,11 +55,11 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index 286723d..ed71e98 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -35,8 +35,8 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index ba68890..23db83a 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -56,13 +56,13 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index d10a7ad..a1ce1d4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -72,7 +72,7 @@ import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 219144a..ad0598d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -143,7 +143,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
@@ -151,6 +150,8 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
@@ -162,7 +163,7 @@ import com.google.common.collect.Iterables;
  */
 public class Master extends AccumuloServerContext implements LiveTServerSet.Listener, TableObserver, CurrentState {
 
-  final static Logger log = Logger.getLogger(Master.class);
+  final static Logger log = LoggerFactory.getLogger(Master.class);
 
   final static int ONE_SECOND = 1000;
   final private static Text METADATA_TABLE_ID = new Text(MetadataTable.ID);
@@ -423,7 +424,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
         perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
         haveUpgradedZooKeeper = true;
       } catch (Exception ex) {
-        log.fatal("Error performing upgrade", ex);
+        log.error("Error performing upgrade", ex);
         System.exit(1);
       }
     }
@@ -478,7 +479,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
               log.info("Upgrade complete");
               waitForMetadataUpgrade.countDown();
             } catch (Exception ex) {
-              log.fatal("Error performing upgrade", ex);
+              log.error("Error performing upgrade", ex);
               System.exit(1);
             }
 
@@ -1000,7 +1001,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
           if (connection != null)
             connection.fastHalt(masterLock);
         } catch (TException e) {
-          log.error(e, e);
+          log.error(e.getMessage(), e);
         }
         tserverSet.remove(instance);
       }
@@ -1267,7 +1268,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
       Halt.halt(-1, new Runnable() {
         @Override
         public void run() {
-          log.fatal("No longer able to monitor master lock node", e);
+          log.error("No longer able to monitor master lock node", e);
         }
       });
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 3809a29..f73c236 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.master;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -27,6 +28,7 @@ import java.util.Set;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
@@ -44,6 +46,7 @@ import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -55,8 +58,12 @@ import org.apache.accumulo.core.master.thrift.TabletSplit;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
@@ -74,6 +81,8 @@ import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
 import org.apache.accumulo.server.util.NamespacePropUtil;
 import org.apache.accumulo.server.util.SystemPropUtil;
@@ -82,10 +91,12 @@ import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 class MasterClientServiceHandler extends FateServiceHandler implements MasterClientService.Iface {
 
@@ -475,4 +486,103 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
       throw new TException(e.getMessage());
     }
   }
+
+  @Override
+  public boolean drainReplicationTable(TInfo tfino, TCredentials credentials, String tableName, Set<String> logsToWatch) throws TException {
+    Connector conn;
+    try {
+      conn = master.getConnector();
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new RuntimeException("Failed to obtain connector", e);
+    }
+
+    final Text tableId = new Text(getTableId(master.getInstance(), tableName));
+
+    log.trace("Waiting for {} to be replicated for {}", logsToWatch, tableId);
+
+    log.trace("Reading from metadata table");
+    final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange()));
+    BatchScanner bs;
+    try {
+      bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException("Could not read metadata table", e);
+    }
+    bs.setRanges(range);
+    bs.fetchColumnFamily(ReplicationSection.COLF);
+    try {
+      // Return immediately if there are records in metadata for these WALs
+      if (!allReferencesReplicated(bs, tableId, logsToWatch)) {
+        return false;
+      }
+    } finally {
+      bs.close();
+    }
+
+    log.trace("reading from replication table");
+    try {
+      bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException("Replication table was not found", e);
+    }
+    bs.setRanges(Collections.singleton(new Range()));
+    try {
+      // No records in metadata, check replication table
+      return allReferencesReplicated(bs, tableId, logsToWatch);
+    } finally {
+      bs.close();
+    }
+  }
+
+  protected String getTableId(Instance instance, String tableName) throws ThriftTableOperationException {
+    return ClientServiceHandler.checkTableId(instance, tableName, null);
+  }
+
+  /**
+   * @return return true records are only in place which are fully replicated
+   */
+  protected boolean allReferencesReplicated(BatchScanner bs, Text tableId, Set<String> relevantLogs) {
+    Text rowHolder = new Text(), colfHolder = new Text();
+    for (Entry<Key,Value> entry : bs) {
+      log.trace("Got key {}", entry.getKey().toStringNoTruncate());
+
+      entry.getKey().getColumnQualifier(rowHolder);
+      if (tableId.equals(rowHolder)) {
+        entry.getKey().getRow(rowHolder);
+        entry.getKey().getColumnFamily(colfHolder);
+
+        String file;
+        if (colfHolder.equals(ReplicationSection.COLF)) {
+          file = rowHolder.toString();
+          file = file.substring(ReplicationSection.getRowPrefix().length());
+        } else if (colfHolder.equals(OrderSection.NAME)) {
+          file = OrderSection.getFile(entry.getKey(), rowHolder);
+          long timeClosed = OrderSection.getTimeClosed(entry.getKey(), rowHolder);
+          log.trace("Order section: {} and {}", timeClosed, file);
+        } else {
+          file = rowHolder.toString();
+        }
+
+        // Skip files that we didn't observe when we started (new files/data)
+        if (!relevantLogs.contains(file)) {
+          log.trace("Found file that we didn't care about {}", file);
+          continue;
+        } else {
+          log.trace("Found file that we *do* care about {}", file);
+        }
+
+        try {
+          Status stat = Status.parseFrom(entry.getValue().get());
+          if (!StatusUtil.isFullyReplicated(stat)) {
+            log.trace("{} and {} is not fully replicated", entry.getKey().getRow(), ProtobufUtil.toString(stat));
+            return false;
+          }
+        } catch (InvalidProtocolBufferException e) {
+          log.trace("Could not parse protobuf for {}", entry.getKey(), e);
+        }
+      }
+    }
+
+    return true;
+  }
 }


Mime
View raw message