accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/4] accumulo git commit: ACCUMULO-3650 Push protobuf-java dependency into server-base.
Date Wed, 11 Mar 2015 21:36:50 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
deleted file mode 100644
index a7cd3f5..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.replication.proto.Replication.Status.Builder;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * Helper methods to create Status protobuf messages
- */
-public class StatusUtil {
-
-  private static final Status INF_END_REPLICATION_STATUS, CLOSED_STATUS;
-  private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
-
-  private static final Status.Builder CREATED_STATUS_BUILDER;
-
-  static {
-    CREATED_STATUS_BUILDER = Status.newBuilder();
-    CREATED_STATUS_BUILDER.setBegin(0);
-    CREATED_STATUS_BUILDER.setEnd(0);
-    CREATED_STATUS_BUILDER.setInfiniteEnd(false);
-    CREATED_STATUS_BUILDER.setClosed(false);
-
-    Builder builder = Status.newBuilder();
-    builder.setBegin(0);
-    builder.setEnd(0);
-    builder.setInfiniteEnd(true);
-    builder.setClosed(false);
-    INF_END_REPLICATION_STATUS = builder.build();
-    INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
-
-    builder = Status.newBuilder();
-    builder.setBegin(0);
-    builder.setEnd(0);
-    builder.setInfiniteEnd(true);
-    builder.setClosed(true);
-    CLOSED_STATUS = builder.build();
-    CLOSED_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_STATUS);
-  }
-
-  /**
-   * Creates a {@link Status} for newly-created data that must be replicated
-   *
-   * @param recordsIngested
-   *          Offset of records which need to be replicated
-   * @return A {@link Status} tracking data that must be replicated
-   */
-  public static Status ingestedUntil(long recordsIngested) {
-    return ingestedUntil(Status.newBuilder(), recordsIngested);
-  }
-
-  public static Status ingestedUntil(Builder builder, long recordsIngested) {
-    return replicatedAndIngested(builder, 0, recordsIngested);
-  }
-
-  /**
-   * @param recordsReplicated
-   *          Offset of records which have been replicated
-   * @return A {@link Status} tracking data that must be replicated
-   */
-  public static Status replicated(long recordsReplicated) {
-    return replicated(Status.newBuilder(), recordsReplicated);
-  }
-
-  /**
-   * @param builder
-   *          Existing {@link Builder} to use
-   * @param recordsReplicated
-   *          Offset of records which have been replicated
-   * @returnA {@link Status} tracking data that must be replicated
-   */
-  public static Status replicated(Status.Builder builder, long recordsReplicated) {
-    return replicatedAndIngested(builder, recordsReplicated, 0);
-  }
-
-  /**
-   * Creates a @{link Status} for a file which has new data and data which has been replicated
-   *
-   * @param recordsReplicated
-   *          Offset of records which have been replicated
-   * @param recordsIngested
-   *          Offset for records which need to be replicated
-   * @return A {@link Status} for the given parameters
-   */
-  public static Status replicatedAndIngested(long recordsReplicated, long recordsIngested) {
-    return replicatedAndIngested(Status.newBuilder(), recordsReplicated, recordsIngested);
-  }
-
-  /**
-   * Same as {@link #replicatedAndIngested(long, long)} but uses the provided {@link Builder}
-   *
-   * @param builder
-   *          An existing builder
-   * @param recordsReplicated
-   *          Offset of records which have been replicated
-   * @param recordsIngested
-   *          Offset of records which need to be replicated
-   * @return A {@link Status} for the given parameters using the builder
-   */
-  public static Status replicatedAndIngested(Status.Builder builder, long recordsReplicated, long recordsIngested) {
-    return builder.setBegin(recordsReplicated).setEnd(recordsIngested).setClosed(false).setInfiniteEnd(false).build();
-  }
-
-  /**
-   * @return A {@link Status} for a new file that was just created
-   */
-  public static synchronized Status fileCreated(long timeCreated) {
-    // We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable)
-    CREATED_STATUS_BUILDER.setCreatedTime(timeCreated);
-    return CREATED_STATUS_BUILDER.build();
-  }
-
-  /**
-   * @return A {@link Value} for a new file that was just created
-   */
-  public static Value fileCreatedValue(long timeCreated) {
-    return ProtobufUtil.toValue(fileCreated(timeCreated));
-  }
-
-  /**
-   * @return A Status representing a closed file
-   */
-  public static Status fileClosed() {
-    return CLOSED_STATUS;
-  }
-
-  /**
-   * @return A Value representing a closed file
-   */
-  public static Value fileClosedValue() {
-    return CLOSED_STATUS_VALUE;
-  }
-
-  /**
-   * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
-   */
-  public static Status openWithUnknownLength() {
-    return INF_END_REPLICATION_STATUS;
-  }
-
-  /**
-   * @return A {@link Value} for an open file of unspecified length, all of which needs replicating.
-   */
-  public static Value openWithUnknownLengthValue() {
-    return INF_END_REPLICATION_STATUS_VALUE;
-  }
-
-  /**
-   * @param v
-   *          Value with serialized Status
-   * @return A Status created from the Value
-   */
-  public static Status fromValue(Value v) throws InvalidProtocolBufferException {
-    return Status.parseFrom(v.get());
-  }
-
-  /**
-   * Is the given Status fully replicated and is its file ready for deletion on the source
-   *
-   * @param status
-   *          a Status protobuf
-   * @return True if the file this Status references can be deleted.
-   */
-  public static boolean isSafeForRemoval(Status status) {
-    return status.getClosed() && isFullyReplicated(status);
-  }
-
-  /**
-   * Is the given Status fully replicated but potentially not yet safe for deletion
-   *
-   * @param status
-   *          a Status protobuf
-   * @return True if the file this Status references is fully replicated so far
-   */
-  public static boolean isFullyReplicated(Status status) {
-    if (status.getInfiniteEnd()) {
-      return Long.MAX_VALUE == status.getBegin();
-    } else {
-      return status.getBegin() >= status.getEnd();
-    }
-  }
-
-  /**
-   * Given the {@link Status}, is there replication work to be done
-   *
-   * @param status
-   *          Status for a file
-   * @return true if replication work is required
-   */
-  public static boolean isWorkRequired(Status status) {
-    if (status.getInfiniteEnd()) {
-      return Long.MAX_VALUE != status.getBegin();
-    } else {
-      return status.getBegin() < status.getEnd();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java b/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
deleted file mode 100644
index 030f4ec..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
+++ /dev/null
@@ -1,949 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// Generated by the protocol buffer compiler.  DO NOT EDIT!
-// source: src/main/protobuf/replication.proto
-
-package org.apache.accumulo.core.replication.proto;
-
-@SuppressWarnings({"unused"}) public final class Replication {
-  private Replication() {}
-  public static void registerAllExtensions(
-      com.google.protobuf.ExtensionRegistry registry) {
-  }
-  public interface StatusOrBuilder
-      extends com.google.protobuf.MessageOrBuilder {
-
-    // optional int64 begin = 1 [default = 0];
-    /**
-     * <code>optional int64 begin = 1 [default = 0];</code>
-     *
-     * <pre>
-     * offset where replication should start
-     * </pre>
-     */
-    boolean hasBegin();
-    /**
-     * <code>optional int64 begin = 1 [default = 0];</code>
-     *
-     * <pre>
-     * offset where replication should start
-     * </pre>
-     */
-    long getBegin();
-
-    // optional int64 end = 2 [default = 0];
-    /**
-     * <code>optional int64 end = 2 [default = 0];</code>
-     *
-     * <pre>
-     * offset where data is ready for replication
-     * </pre>
-     */
-    boolean hasEnd();
-    /**
-     * <code>optional int64 end = 2 [default = 0];</code>
-     *
-     * <pre>
-     * offset where data is ready for replication
-     * </pre>
-     */
-    long getEnd();
-
-    // optional bool infiniteEnd = 3 [default = false];
-    /**
-     * <code>optional bool infiniteEnd = 3 [default = false];</code>
-     *
-     * <pre>
-     * do we have a discrete 'end'
-     * </pre>
-     */
-    boolean hasInfiniteEnd();
-    /**
-     * <code>optional bool infiniteEnd = 3 [default = false];</code>
-     *
-     * <pre>
-     * do we have a discrete 'end'
-     * </pre>
-     */
-    boolean getInfiniteEnd();
-
-    // optional bool closed = 4 [default = false];
-    /**
-     * <code>optional bool closed = 4 [default = false];</code>
-     *
-     * <pre>
-     * will more data be appended to the file
-     * </pre>
-     */
-    boolean hasClosed();
-    /**
-     * <code>optional bool closed = 4 [default = false];</code>
-     *
-     * <pre>
-     * will more data be appended to the file
-     * </pre>
-     */
-    boolean getClosed();
-
-    // optional int64 createdTime = 5 [default = 0];
-    /**
-     * <code>optional int64 createdTime = 5 [default = 0];</code>
-     *
-     * <pre>
-     * when, in ms, was the file created?
-     * </pre>
-     */
-    boolean hasCreatedTime();
-    /**
-     * <code>optional int64 createdTime = 5 [default = 0];</code>
-     *
-     * <pre>
-     * when, in ms, was the file created?
-     * </pre>
-     */
-    long getCreatedTime();
-  }
-  /**
-   * Protobuf type {@code Status}
-   */
-  public static final class Status extends
-      com.google.protobuf.GeneratedMessage
-      implements StatusOrBuilder {
-    // Use Status.newBuilder() to construct.
-    private Status(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
-      super(builder);
-      this.unknownFields = builder.getUnknownFields();
-    }
-    private Status(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
-
-    private static final Status defaultInstance;
-    public static Status getDefaultInstance() {
-      return defaultInstance;
-    }
-
-    public Status getDefaultInstanceForType() {
-      return defaultInstance;
-    }
-
-    private final com.google.protobuf.UnknownFieldSet unknownFields;
-    @java.lang.Override
-    public final com.google.protobuf.UnknownFieldSet
-        getUnknownFields() {
-      return this.unknownFields;
-    }
-    private Status(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      initFields();
-      int mutable_bitField0_ = 0;
-      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
-          com.google.protobuf.UnknownFieldSet.newBuilder();
-      try {
-        boolean done = false;
-        while (!done) {
-          int tag = input.readTag();
-          switch (tag) {
-            case 0:
-              done = true;
-              break;
-            default: {
-              if (!parseUnknownField(input, unknownFields,
-                                     extensionRegistry, tag)) {
-                done = true;
-              }
-              break;
-            }
-            case 8: {
-              bitField0_ |= 0x00000001;
-              begin_ = input.readInt64();
-              break;
-            }
-            case 16: {
-              bitField0_ |= 0x00000002;
-              end_ = input.readInt64();
-              break;
-            }
-            case 24: {
-              bitField0_ |= 0x00000004;
-              infiniteEnd_ = input.readBool();
-              break;
-            }
-            case 32: {
-              bitField0_ |= 0x00000008;
-              closed_ = input.readBool();
-              break;
-            }
-            case 40: {
-              bitField0_ |= 0x00000010;
-              createdTime_ = input.readInt64();
-              break;
-            }
-          }
-        }
-      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-        throw e.setUnfinishedMessage(this);
-      } catch (java.io.IOException e) {
-        throw new com.google.protobuf.InvalidProtocolBufferException(
-            e.getMessage()).setUnfinishedMessage(this);
-      } finally {
-        this.unknownFields = unknownFields.build();
-        makeExtensionsImmutable();
-      }
-    }
-    public static final com.google.protobuf.Descriptors.Descriptor
-        getDescriptor() {
-      return org.apache.accumulo.core.replication.proto.Replication.internal_static_Status_descriptor;
-    }
-
-    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-        internalGetFieldAccessorTable() {
-      return org.apache.accumulo.core.replication.proto.Replication.internal_static_Status_fieldAccessorTable
-          .ensureFieldAccessorsInitialized(
-              org.apache.accumulo.core.replication.proto.Replication.Status.class, org.apache.accumulo.core.replication.proto.Replication.Status.Builder.class);
-    }
-
-    public static com.google.protobuf.Parser<Status> PARSER =
-        new com.google.protobuf.AbstractParser<Status>() {
-      public Status parsePartialFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws com.google.protobuf.InvalidProtocolBufferException {
-        return new Status(input, extensionRegistry);
-      }
-    };
-
-    @java.lang.Override
-    public com.google.protobuf.Parser<Status> getParserForType() {
-      return PARSER;
-    }
-
-    private int bitField0_;
-    // optional int64 begin = 1 [default = 0];
-    public static final int BEGIN_FIELD_NUMBER = 1;
-    private long begin_;
-    /**
-     * <code>optional int64 begin = 1 [default = 0];</code>
-     *
-     * <pre>
-     * offset where replication should start
-     * </pre>
-     */
-    public boolean hasBegin() {
-      return ((bitField0_ & 0x00000001) == 0x00000001);
-    }
-    /**
-     * <code>optional int64 begin = 1 [default = 0];</code>
-     *
-     * <pre>
-     * offset where replication should start
-     * </pre>
-     */
-    public long getBegin() {
-      return begin_;
-    }
-
-    // optional int64 end = 2 [default = 0];
-    public static final int END_FIELD_NUMBER = 2;
-    private long end_;
-    /**
-     * <code>optional int64 end = 2 [default = 0];</code>
-     *
-     * <pre>
-     * offset where data is ready for replication
-     * </pre>
-     */
-    public boolean hasEnd() {
-      return ((bitField0_ & 0x00000002) == 0x00000002);
-    }
-    /**
-     * <code>optional int64 end = 2 [default = 0];</code>
-     *
-     * <pre>
-     * offset where data is ready for replication
-     * </pre>
-     */
-    public long getEnd() {
-      return end_;
-    }
-
-    // optional bool infiniteEnd = 3 [default = false];
-    public static final int INFINITEEND_FIELD_NUMBER = 3;
-    private boolean infiniteEnd_;
-    /**
-     * <code>optional bool infiniteEnd = 3 [default = false];</code>
-     *
-     * <pre>
-     * do we have a discrete 'end'
-     * </pre>
-     */
-    public boolean hasInfiniteEnd() {
-      return ((bitField0_ & 0x00000004) == 0x00000004);
-    }
-    /**
-     * <code>optional bool infiniteEnd = 3 [default = false];</code>
-     *
-     * <pre>
-     * do we have a discrete 'end'
-     * </pre>
-     */
-    public boolean getInfiniteEnd() {
-      return infiniteEnd_;
-    }
-
-    // optional bool closed = 4 [default = false];
-    public static final int CLOSED_FIELD_NUMBER = 4;
-    private boolean closed_;
-    /**
-     * <code>optional bool closed = 4 [default = false];</code>
-     *
-     * <pre>
-     * will more data be appended to the file
-     * </pre>
-     */
-    public boolean hasClosed() {
-      return ((bitField0_ & 0x00000008) == 0x00000008);
-    }
-    /**
-     * <code>optional bool closed = 4 [default = false];</code>
-     *
-     * <pre>
-     * will more data be appended to the file
-     * </pre>
-     */
-    public boolean getClosed() {
-      return closed_;
-    }
-
-    // optional int64 createdTime = 5 [default = 0];
-    public static final int CREATEDTIME_FIELD_NUMBER = 5;
-    private long createdTime_;
-    /**
-     * <code>optional int64 createdTime = 5 [default = 0];</code>
-     *
-     * <pre>
-     * when, in ms, was the file created?
-     * </pre>
-     */
-    public boolean hasCreatedTime() {
-      return ((bitField0_ & 0x00000010) == 0x00000010);
-    }
-    /**
-     * <code>optional int64 createdTime = 5 [default = 0];</code>
-     *
-     * <pre>
-     * when, in ms, was the file created?
-     * </pre>
-     */
-    public long getCreatedTime() {
-      return createdTime_;
-    }
-
-    private void initFields() {
-      begin_ = 0L;
-      end_ = 0L;
-      infiniteEnd_ = false;
-      closed_ = false;
-      createdTime_ = 0L;
-    }
-    private byte memoizedIsInitialized = -1;
-    public final boolean isInitialized() {
-      byte isInitialized = memoizedIsInitialized;
-      if (isInitialized != -1) return isInitialized == 1;
-
-      memoizedIsInitialized = 1;
-      return true;
-    }
-
-    public void writeTo(com.google.protobuf.CodedOutputStream output)
-                        throws java.io.IOException {
-      getSerializedSize();
-      if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeInt64(1, begin_);
-      }
-      if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeInt64(2, end_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        output.writeBool(3, infiniteEnd_);
-      }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        output.writeBool(4, closed_);
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        output.writeInt64(5, createdTime_);
-      }
-      getUnknownFields().writeTo(output);
-    }
-
-    private int memoizedSerializedSize = -1;
-    public int getSerializedSize() {
-      int size = memoizedSerializedSize;
-      if (size != -1) return size;
-
-      size = 0;
-      if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt64Size(1, begin_);
-      }
-      if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt64Size(2, end_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBoolSize(3, infiniteEnd_);
-      }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBoolSize(4, closed_);
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt64Size(5, createdTime_);
-      }
-      size += getUnknownFields().getSerializedSize();
-      memoizedSerializedSize = size;
-      return size;
-    }
-
-    private static final long serialVersionUID = 0L;
-    @java.lang.Override
-    protected java.lang.Object writeReplace()
-        throws java.io.ObjectStreamException {
-      return super.writeReplace();
-    }
-
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        com.google.protobuf.ByteString data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        com.google.protobuf.ByteString data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(byte[] data)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        byte[] data,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws com.google.protobuf.InvalidProtocolBufferException {
-      return PARSER.parseFrom(data, extensionRegistry);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseDelimitedFrom(java.io.InputStream input)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseDelimitedFrom(
-        java.io.InputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseDelimitedFrom(input, extensionRegistry);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        com.google.protobuf.CodedInputStream input)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input);
-    }
-    public static org.apache.accumulo.core.replication.proto.Replication.Status parseFrom(
-        com.google.protobuf.CodedInputStream input,
-        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-        throws java.io.IOException {
-      return PARSER.parseFrom(input, extensionRegistry);
-    }
-
-    public static Builder newBuilder() { return Builder.create(); }
-    public Builder newBuilderForType() { return newBuilder(); }
-    public static Builder newBuilder(org.apache.accumulo.core.replication.proto.Replication.Status prototype) {
-      return newBuilder().mergeFrom(prototype);
-    }
-    public Builder toBuilder() { return newBuilder(this); }
-
-    @java.lang.Override
-    protected Builder newBuilderForType(
-        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-      Builder builder = new Builder(parent);
-      return builder;
-    }
-    /**
-     * Protobuf type {@code Status}
-     */
-    public static final class Builder extends
-        com.google.protobuf.GeneratedMessage.Builder<Builder>
-       implements org.apache.accumulo.core.replication.proto.Replication.StatusOrBuilder {
-      public static final com.google.protobuf.Descriptors.Descriptor
-          getDescriptor() {
-        return org.apache.accumulo.core.replication.proto.Replication.internal_static_Status_descriptor;
-      }
-
-      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
-          internalGetFieldAccessorTable() {
-        return org.apache.accumulo.core.replication.proto.Replication.internal_static_Status_fieldAccessorTable
-            .ensureFieldAccessorsInitialized(
-                org.apache.accumulo.core.replication.proto.Replication.Status.class, org.apache.accumulo.core.replication.proto.Replication.Status.Builder.class);
-      }
-
-      // Construct using org.apache.accumulo.core.replication.proto.Replication.Status.newBuilder()
-      private Builder() {
-        maybeForceBuilderInitialization();
-      }
-
-      private Builder(
-          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
-        super(parent);
-        maybeForceBuilderInitialization();
-      }
-      private void maybeForceBuilderInitialization() {
-        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
-        }
-      }
-      private static Builder create() {
-        return new Builder();
-      }
-
-      public Builder clear() {
-        super.clear();
-        begin_ = 0L;
-        bitField0_ = (bitField0_ & ~0x00000001);
-        end_ = 0L;
-        bitField0_ = (bitField0_ & ~0x00000002);
-        infiniteEnd_ = false;
-        bitField0_ = (bitField0_ & ~0x00000004);
-        closed_ = false;
-        bitField0_ = (bitField0_ & ~0x00000008);
-        createdTime_ = 0L;
-        bitField0_ = (bitField0_ & ~0x00000010);
-        return this;
-      }
-
-      public Builder clone() {
-        return create().mergeFrom(buildPartial());
-      }
-
-      public com.google.protobuf.Descriptors.Descriptor
-          getDescriptorForType() {
-        return org.apache.accumulo.core.replication.proto.Replication.internal_static_Status_descriptor;
-      }
-
-      public org.apache.accumulo.core.replication.proto.Replication.Status getDefaultInstanceForType() {
-        return org.apache.accumulo.core.replication.proto.Replication.Status.getDefaultInstance();
-      }
-
-      public org.apache.accumulo.core.replication.proto.Replication.Status build() {
-        org.apache.accumulo.core.replication.proto.Replication.Status result = buildPartial();
-        if (!result.isInitialized()) {
-          throw newUninitializedMessageException(result);
-        }
-        return result;
-      }
-
-      public org.apache.accumulo.core.replication.proto.Replication.Status buildPartial() {
-        org.apache.accumulo.core.replication.proto.Replication.Status result = new org.apache.accumulo.core.replication.proto.Replication.Status(this);
-        int from_bitField0_ = bitField0_;
-        int to_bitField0_ = 0;
-        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
-          to_bitField0_ |= 0x00000001;
-        }
-        result.begin_ = begin_;
-        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
-          to_bitField0_ |= 0x00000002;
-        }
-        result.end_ = end_;
-        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
-          to_bitField0_ |= 0x00000004;
-        }
-        result.infiniteEnd_ = infiniteEnd_;
-        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
-          to_bitField0_ |= 0x00000008;
-        }
-        result.closed_ = closed_;
-        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
-          to_bitField0_ |= 0x00000010;
-        }
-        result.createdTime_ = createdTime_;
-        result.bitField0_ = to_bitField0_;
-        onBuilt();
-        return result;
-      }
-
-      public Builder mergeFrom(com.google.protobuf.Message other) {
-        if (other instanceof org.apache.accumulo.core.replication.proto.Replication.Status) {
-          return mergeFrom((org.apache.accumulo.core.replication.proto.Replication.Status)other);
-        } else {
-          super.mergeFrom(other);
-          return this;
-        }
-      }
-
-      public Builder mergeFrom(org.apache.accumulo.core.replication.proto.Replication.Status other) {
-        if (other == org.apache.accumulo.core.replication.proto.Replication.Status.getDefaultInstance()) return this;
-        if (other.hasBegin()) {
-          setBegin(other.getBegin());
-        }
-        if (other.hasEnd()) {
-          setEnd(other.getEnd());
-        }
-        if (other.hasInfiniteEnd()) {
-          setInfiniteEnd(other.getInfiniteEnd());
-        }
-        if (other.hasClosed()) {
-          setClosed(other.getClosed());
-        }
-        if (other.hasCreatedTime()) {
-          setCreatedTime(other.getCreatedTime());
-        }
-        this.mergeUnknownFields(other.getUnknownFields());
-        return this;
-      }
-
-      public final boolean isInitialized() {
-        return true;
-      }
-
-      public Builder mergeFrom(
-          com.google.protobuf.CodedInputStream input,
-          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
-          throws java.io.IOException {
-        org.apache.accumulo.core.replication.proto.Replication.Status parsedMessage = null;
-        try {
-          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
-        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
-          parsedMessage = (org.apache.accumulo.core.replication.proto.Replication.Status) e.getUnfinishedMessage();
-          throw e;
-        } finally {
-          if (parsedMessage != null) {
-            mergeFrom(parsedMessage);
-          }
-        }
-        return this;
-      }
-      private int bitField0_;
-
-      // optional int64 begin = 1 [default = 0];
-      private long begin_ ;
-      /**
-       * <code>optional int64 begin = 1 [default = 0];</code>
-       *
-       * <pre>
-       * offset where replication should start
-       * </pre>
-       */
-      public boolean hasBegin() {
-        return ((bitField0_ & 0x00000001) == 0x00000001);
-      }
-      /**
-       * <code>optional int64 begin = 1 [default = 0];</code>
-       *
-       * <pre>
-       * offset where replication should start
-       * </pre>
-       */
-      public long getBegin() {
-        return begin_;
-      }
-      /**
-       * <code>optional int64 begin = 1 [default = 0];</code>
-       *
-       * <pre>
-       * offset where replication should start
-       * </pre>
-       */
-      public Builder setBegin(long value) {
-        bitField0_ |= 0x00000001;
-        begin_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int64 begin = 1 [default = 0];</code>
-       *
-       * <pre>
-       * offset where replication should start
-       * </pre>
-       */
-      public Builder clearBegin() {
-        bitField0_ = (bitField0_ & ~0x00000001);
-        begin_ = 0L;
-        onChanged();
-        return this;
-      }
-
-      // optional int64 end = 2 [default = 0];
-      private long end_ ;
-      /**
-       * <code>optional int64 end = 2 [default = 0];</code>
-       *
-       * <pre>
-       * offset where data is ready for replication
-       * </pre>
-       */
-      public boolean hasEnd() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
-      }
-      /**
-       * <code>optional int64 end = 2 [default = 0];</code>
-       *
-       * <pre>
-       * offset where data is ready for replication
-       * </pre>
-       */
-      public long getEnd() {
-        return end_;
-      }
-      /**
-       * <code>optional int64 end = 2 [default = 0];</code>
-       *
-       * <pre>
-       * offset where data is ready for replication
-       * </pre>
-       */
-      public Builder setEnd(long value) {
-        bitField0_ |= 0x00000002;
-        end_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int64 end = 2 [default = 0];</code>
-       *
-       * <pre>
-       * offset where data is ready for replication
-       * </pre>
-       */
-      public Builder clearEnd() {
-        bitField0_ = (bitField0_ & ~0x00000002);
-        end_ = 0L;
-        onChanged();
-        return this;
-      }
-
-      // optional bool infiniteEnd = 3 [default = false];
-      private boolean infiniteEnd_ ;
-      /**
-       * <code>optional bool infiniteEnd = 3 [default = false];</code>
-       *
-       * <pre>
-       * do we have a discrete 'end'
-       * </pre>
-       */
-      public boolean hasInfiniteEnd() {
-        return ((bitField0_ & 0x00000004) == 0x00000004);
-      }
-      /**
-       * <code>optional bool infiniteEnd = 3 [default = false];</code>
-       *
-       * <pre>
-       * do we have a discrete 'end'
-       * </pre>
-       */
-      public boolean getInfiniteEnd() {
-        return infiniteEnd_;
-      }
-      /**
-       * <code>optional bool infiniteEnd = 3 [default = false];</code>
-       *
-       * <pre>
-       * do we have a discrete 'end'
-       * </pre>
-       */
-      public Builder setInfiniteEnd(boolean value) {
-        bitField0_ |= 0x00000004;
-        infiniteEnd_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional bool infiniteEnd = 3 [default = false];</code>
-       *
-       * <pre>
-       * do we have a discrete 'end'
-       * </pre>
-       */
-      public Builder clearInfiniteEnd() {
-        bitField0_ = (bitField0_ & ~0x00000004);
-        infiniteEnd_ = false;
-        onChanged();
-        return this;
-      }
-
-      // optional bool closed = 4 [default = false];
-      private boolean closed_ ;
-      /**
-       * <code>optional bool closed = 4 [default = false];</code>
-       *
-       * <pre>
-       * will more data be appended to the file
-       * </pre>
-       */
-      public boolean hasClosed() {
-        return ((bitField0_ & 0x00000008) == 0x00000008);
-      }
-      /**
-       * <code>optional bool closed = 4 [default = false];</code>
-       *
-       * <pre>
-       * will more data be appended to the file
-       * </pre>
-       */
-      public boolean getClosed() {
-        return closed_;
-      }
-      /**
-       * <code>optional bool closed = 4 [default = false];</code>
-       *
-       * <pre>
-       * will more data be appended to the file
-       * </pre>
-       */
-      public Builder setClosed(boolean value) {
-        bitField0_ |= 0x00000008;
-        closed_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional bool closed = 4 [default = false];</code>
-       *
-       * <pre>
-       * will more data be appended to the file
-       * </pre>
-       */
-      public Builder clearClosed() {
-        bitField0_ = (bitField0_ & ~0x00000008);
-        closed_ = false;
-        onChanged();
-        return this;
-      }
-
-      // optional int64 createdTime = 5 [default = 0];
-      private long createdTime_ ;
-      /**
-       * <code>optional int64 createdTime = 5 [default = 0];</code>
-       *
-       * <pre>
-       * when, in ms, was the file created?
-       * </pre>
-       */
-      public boolean hasCreatedTime() {
-        return ((bitField0_ & 0x00000010) == 0x00000010);
-      }
-      /**
-       * <code>optional int64 createdTime = 5 [default = 0];</code>
-       *
-       * <pre>
-       * when, in ms, was the file created?
-       * </pre>
-       */
-      public long getCreatedTime() {
-        return createdTime_;
-      }
-      /**
-       * <code>optional int64 createdTime = 5 [default = 0];</code>
-       *
-       * <pre>
-       * when, in ms, was the file created?
-       * </pre>
-       */
-      public Builder setCreatedTime(long value) {
-        bitField0_ |= 0x00000010;
-        createdTime_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int64 createdTime = 5 [default = 0];</code>
-       *
-       * <pre>
-       * when, in ms, was the file created?
-       * </pre>
-       */
-      public Builder clearCreatedTime() {
-        bitField0_ = (bitField0_ & ~0x00000010);
-        createdTime_ = 0L;
-        onChanged();
-        return this;
-      }
-
-      // @@protoc_insertion_point(builder_scope:Status)
-    }
-
-    static {
-      defaultInstance = new Status(true);
-      defaultInstance.initFields();
-    }
-
-    // @@protoc_insertion_point(class_scope:Status)
-  }
-
-  private static com.google.protobuf.Descriptors.Descriptor
-    internal_static_Status_descriptor;
-  private static
-    com.google.protobuf.GeneratedMessage.FieldAccessorTable
-      internal_static_Status_fieldAccessorTable;
-
-  public static com.google.protobuf.Descriptors.FileDescriptor
-      getDescriptor() {
-    return descriptor;
-  }
-  private static com.google.protobuf.Descriptors.FileDescriptor
-      descriptor;
-  static {
-    java.lang.String[] descriptorData = {
-      "\n#src/main/protobuf/replication.proto\"u\n" +
-      "\006Status\022\020\n\005begin\030\001 \001(\003:\0010\022\016\n\003end\030\002 \001(\003:\001" +
-      "0\022\032\n\013infiniteEnd\030\003 \001(\010:\005false\022\025\n\006closed\030" +
-      "\004 \001(\010:\005false\022\026\n\013createdTime\030\005 \001(\003:\0010B.\n*" +
-      "org.apache.accumulo.core.replication.pro" +
-      "toH\001"
-    };
-    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
-      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
-        public com.google.protobuf.ExtensionRegistry assignDescriptors(
-            com.google.protobuf.Descriptors.FileDescriptor root) {
-          descriptor = root;
-          internal_static_Status_descriptor =
-            getDescriptor().getMessageTypes().get(0);
-          internal_static_Status_fieldAccessorTable = new
-            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
-              internal_static_Status_descriptor,
-              new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", "CreatedTime", });
-          return null;
-        }
-      };
-    com.google.protobuf.Descriptors.FileDescriptor
-      .internalBuildGeneratedFileFrom(descriptorData,
-        new com.google.protobuf.Descriptors.FileDescriptor[] {
-        }, assigner);
-  }
-
-  // @@protoc_insertion_point(outer_class_scope)
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/protobuf/replication.proto
----------------------------------------------------------------------
diff --git a/core/src/main/protobuf/replication.proto b/core/src/main/protobuf/replication.proto
deleted file mode 100644
index 7feda58..0000000
--- a/core/src/main/protobuf/replication.proto
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-option java_package = "org.apache.accumulo.core.replication.proto";
-option optimize_for = SPEED;
-
-message Status {
-	optional int64 begin = 1 [default = 0]; // offset where replication should start
-	optional int64 end = 2 [default = 0]; // offset where data is ready for replication
-	optional bool infiniteEnd = 3 [default = false]; // do we have a discrete 'end'
-	optional bool closed = 4 [default = false]; // will more data be appended to the file
-	optional int64 createdTime = 5 [default = 0]; // when, in ms, was the file created?
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/scripts/generate-protobuf.sh
----------------------------------------------------------------------
diff --git a/core/src/main/scripts/generate-protobuf.sh b/core/src/main/scripts/generate-protobuf.sh
deleted file mode 100755
index a9d886e..0000000
--- a/core/src/main/scripts/generate-protobuf.sh
+++ /dev/null
@@ -1,98 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This script will regenerate the protobuf code for Accumulo
-
-# NOTES:
-#   To support this script being called by other modules, only edit the right side.
-#   In other scripts, set the variables that diverge from the defaults below, then call this script.
-#   Leave the BUILD_DIR and FINAL_DIR alone for Maven builds.
-# ========================================================================================================================
-[[ -z $REQUIRED_PROTOC_VERSION ]] && REQUIRED_PROTOC_VERSION='libprotoc 2.5.0'
-[[ -z $BUILD_DIR ]]               && BUILD_DIR='target/proto'
-[[ -z $FINAL_DIR ]]               && FINAL_DIR='src/main'
-# ========================================================================================================================
-
-fail() {
-  echo "$@"
-  exit 1
-}
-
-# Test to see if we have thrift installed
-VERSION=$(protoc --version 2>/dev/null | grep -F "${REQUIRED_PROTOC_VERSION}" |  wc -l)
-if [[ $VERSION != 1 ]] ; then
-  # Nope: bail
-  echo "****************************************************"
-  echo "*** protoc is not available"
-  echo "***   expecting 'protoc --version' to return ${REQUIRED_PROTOC_VERSION}"
-  echo "*** generated code will not be updated"
-  fail "****************************************************"
-fi
-
-# Ensure output directories are created
-PROTOC_ARGS="--java_out=$BUILD_DIR"
-rm -rf $BUILD_DIR
-mkdir -p $BUILD_DIR
-
-protoc ${PROTOC_ARGS} src/main/protobuf/*.proto || fail unable to generate Java protocol buffer classes
-
-# For all generated thrift code, suppress all warnings and add the LICENSE header
-s='@SuppressWarnings({"unused"})'
-find $BUILD_DIR -name '*.java' -print0 | xargs -0 sed -i.orig -e 's/\(public final class \)/'"$s"' \1/'
-
-PREFIX="/*
-"
-LINE_NOTATION=" *"
-SUFFIX="
- */"
-FILE_SUFFIX=(.java)
-
-for file in "${FILE_SUFFIX[@]}"; do
-  for f in $(find $BUILD_DIR/ -name "*$file"); do
-    cat - "$f" > "${f}-with-license" <<EOF
-${PREFIX}${LINE_NOTATION} Licensed to the Apache Software Foundation (ASF) under one or more
-${LINE_NOTATION} contributor license agreements.  See the NOTICE file distributed with
-${LINE_NOTATION} this work for additional information regarding copyright ownership.
-${LINE_NOTATION} The ASF licenses this file to You under the Apache License, Version 2.0
-${LINE_NOTATION} (the "License"); you may not use this file except in compliance with
-${LINE_NOTATION} the License.  You may obtain a copy of the License at
-${LINE_NOTATION}
-${LINE_NOTATION}     http://www.apache.org/licenses/LICENSE-2.0
-${LINE_NOTATION}
-${LINE_NOTATION} Unless required by applicable law or agreed to in writing, software
-${LINE_NOTATION} distributed under the License is distributed on an "AS IS" BASIS,
-${LINE_NOTATION} WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-${LINE_NOTATION} See the License for the specific language governing permissions and
-${LINE_NOTATION} limitations under the License.${SUFFIX}
-EOF
-  done
-done
-
-# For every generated java file, compare it with the version-controlled one, and copy the ones that have changed into place
-SDIR="${BUILD_DIR}/org/apache/accumulo/core/replication/proto"
-DDIR="${FINAL_DIR}/java/org/apache/accumulo/core/replication/proto"
-FILE_SUFFIX=(.java)
-mkdir -p "$DDIR"
-for file in "${FILE_SUFFIX[@]}"; do
-  for f in $(find $SDIR -name *$file); do
-    DEST=$DDIR/$(basename "$f")
-    if ! cmp -s "${f}-with-license" "${DEST}" ; then
-      echo cp -f "${f}-with-license" "${DEST}"
-      cp -f "${f}-with-license" "${DEST}" || fail unable to copy files to java workspace
-    fi
-  done
-done

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/main/thrift/master.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/master.thrift b/core/src/main/thrift/master.thrift
index 8a83438..f7a1dc9 100644
--- a/core/src/main/thrift/master.thrift
+++ b/core/src/main/thrift/master.thrift
@@ -176,4 +176,7 @@ service MasterClientService extends FateService {
 
   // Delegation token request
   security.TDelegationToken getDelegationToken(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:security.TDelegationTokenConfig cfg) throws (1:client.ThriftSecurityException sec)
+
+  // Determine when all provided logs are replicated
+  bool drainReplicationTable(1:trace.TInfo tfino, 2:security.TCredentials credentials, 3:string tableName, 4:set<string> logsToWatch)
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
deleted file mode 100644
index df9816e..0000000
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import java.util.Arrays;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.impl.ClientContext;
-import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class ReplicationOperationsImplTest {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplTest.class);
-
-  private MockInstance inst;
-
-  @Rule
-  public TestName test = new TestName();
-
-  @Before
-  public void setup() {
-    inst = new MockInstance(test.getMethodName());
-  }
-
-  @Test
-  public void waitsUntilEntriesAreReplicated() throws Exception {
-    Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create("foo");
-    Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
-
-    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
-
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-
-    Mutation m = new Mutation(file1);
-    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    bw.close();
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
-
-    bw.addMutation(m);
-
-    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
-    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
-
-    bw.close();
-
-    final AtomicBoolean done = new AtomicBoolean(false);
-    final AtomicBoolean exception = new AtomicBoolean(false);
-    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
-    final ReplicationOperationsImpl roi = new ReplicationOperationsImpl(context);
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          roi.drain("foo");
-        } catch (Exception e) {
-          log.error("Got error", e);
-          exception.set(true);
-        }
-        done.set(true);
-      }
-    });
-
-    t.start();
-
-    // With the records, we shouldn't be drained
-    Assert.assertFalse(done.get());
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.putDelete(ReplicationSection.COLF, tableId);
-    bw.addMutation(m);
-    bw.flush();
-
-    Assert.assertFalse(done.get());
-
-    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
-    m.putDelete(ReplicationSection.COLF, tableId);
-    bw.addMutation(m);
-    bw.flush();
-    bw.close();
-
-    // Removing metadata entries doesn't change anything
-    Assert.assertFalse(done.get());
-
-    // Remove the replication entries too
-    bw = ReplicationTable.getBatchWriter(conn);
-    m = new Mutation(file1);
-    m.putDelete(StatusSection.NAME, tableId);
-    bw.addMutation(m);
-    bw.flush();
-
-    Assert.assertFalse(done.get());
-
-    m = new Mutation(file2);
-    m.putDelete(StatusSection.NAME, tableId);
-    bw.addMutation(m);
-    bw.flush();
-
-    try {
-      t.join(5000);
-    } catch (InterruptedException e) {
-      Assert.fail("ReplicationOperations.drain did not complete");
-    }
-
-    // After both metadata and replication
-    Assert.assertTrue(done.get());
-    Assert.assertFalse(exception.get());
-  }
-
-  @Test
-  public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
-    Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create("foo");
-    conn.tableOperations().create("bar");
-
-    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
-    Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar"));
-
-    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
-
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-
-    Mutation m = new Mutation(file1);
-    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    bw.close();
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
-
-    bw.addMutation(m);
-
-    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
-    m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat));
-
-    bw.close();
-
-    final AtomicBoolean done = new AtomicBoolean(false);
-    final AtomicBoolean exception = new AtomicBoolean(false);
-    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
-    final ReplicationOperationsImpl roi = new ReplicationOperationsImpl(context);
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          roi.drain("foo");
-        } catch (Exception e) {
-          log.error("Got error", e);
-          exception.set(true);
-        }
-        done.set(true);
-      }
-    });
-
-    t.start();
-
-    // With the records, we shouldn't be drained
-    Assert.assertFalse(done.get());
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.putDelete(ReplicationSection.COLF, tableId1);
-    bw.addMutation(m);
-    bw.flush();
-
-    // Removing metadata entries doesn't change anything
-    Assert.assertFalse(done.get());
-
-    // Remove the replication entries too
-    bw = ReplicationTable.getBatchWriter(conn);
-    m = new Mutation(file1);
-    m.putDelete(StatusSection.NAME, tableId1);
-    bw.addMutation(m);
-    bw.flush();
-
-    try {
-      t.join(5000);
-    } catch (InterruptedException e) {
-      Assert.fail("ReplicationOperations.drain did not complete");
-    }
-
-    // After both metadata and replication
-    Assert.assertTrue(done.get());
-    Assert.assertFalse(exception.get());
-  }
-
-  @Test
-  public void inprogressReplicationRecordsBlockExecution() throws Exception {
-    Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create("foo");
-
-    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
-
-    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
-
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-
-    Mutation m = new Mutation(file1);
-    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-    bw.close();
-
-    LogEntry logEntry = new LogEntry();
-    logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
-    logEntry.server = "tserver";
-    logEntry.filename = file1;
-    logEntry.tabletId = 1;
-    logEntry.logSet = Arrays.asList(file1);
-    logEntry.timestamp = System.currentTimeMillis();
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    m = new Mutation(logEntry.getRow());
-    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
-    bw.addMutation(m);
-
-    bw.close();
-
-    final AtomicBoolean done = new AtomicBoolean(false);
-    final AtomicBoolean exception = new AtomicBoolean(false);
-    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
-    final ReplicationOperationsImpl roi = new ReplicationOperationsImpl(context);
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          roi.drain("foo");
-        } catch (Exception e) {
-          log.error("Got error", e);
-          exception.set(true);
-        }
-        done.set(true);
-      }
-    });
-
-    t.start();
-
-    // With the records, we shouldn't be drained
-    Assert.assertFalse(done.get());
-
-    Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build();
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus));
-    bw.addMutation(m);
-    bw.flush();
-
-    // Removing metadata entries doesn't change anything
-    Assert.assertFalse(done.get());
-
-    // Remove the replication entries too
-    bw = ReplicationTable.getBatchWriter(conn);
-    m = new Mutation(file1);
-    m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
-    bw.addMutation(m);
-    bw.flush();
-
-    try {
-      t.join(5000);
-    } catch (InterruptedException e) {
-      Assert.fail("ReplicationOperations.drain did not complete");
-    }
-
-    // New records, but not fully replicated ones don't cause it to complete
-    Assert.assertFalse(done.get());
-    Assert.assertFalse(exception.get());
-  }
-
-  @Test
-  public void laterCreatedLogsDontBlockExecution() throws Exception {
-    Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create("foo");
-
-    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
-
-    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
-
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    Mutation m = new Mutation(file1);
-    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-    bw.close();
-
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-
-    bw.close();
-
-    System.out.println("Reading metadata first time");
-    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-      System.out.println(e.getKey());
-    }
-
-    final AtomicBoolean done = new AtomicBoolean(false);
-    final AtomicBoolean exception = new AtomicBoolean(false);
-    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken("")), new ClientConfiguration());
-    final ReplicationOperationsImpl roi = new ReplicationOperationsImpl(context);
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          roi.drain("foo");
-        } catch (Exception e) {
-          log.error("Got error", e);
-          exception.set(true);
-        }
-        done.set(true);
-      }
-    });
-
-    t.start();
-
-    // We need to wait long enough for the table to read once
-    Thread.sleep(2000);
-
-    // Write another file, but also delete the old files
-    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID());
-    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
-    m.putDelete(ReplicationSection.COLF, tableId1);
-    bw.addMutation(m);
-    bw.close();
-
-    System.out.println("Reading metadata second time");
-    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
-      System.out.println(e.getKey());
-    }
-
-    bw = ReplicationTable.getBatchWriter(conn);
-    m = new Mutation(file1);
-    m.putDelete(StatusSection.NAME, tableId1);
-    bw.addMutation(m);
-    bw.close();
-
-    try {
-      t.join(5000);
-    } catch (InterruptedException e) {
-      Assert.fail("ReplicationOperatiotns.drain did not complete");
-    }
-
-    // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did)
-    Assert.assertTrue(done.get());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java b/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
deleted file mode 100644
index 024e05f..0000000
--- a/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class StatusUtilTest {
-
-  @Test
-  public void newFileIsNotCompletelyReplicated() {
-    Assert.assertFalse(StatusUtil.isSafeForRemoval(StatusUtil.fileCreated(0l)));
-  }
-
-  @Test
-  public void openFileIsNotCompletelyReplicated() {
-    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(false).setBegin(0).setEnd(1000).setInfiniteEnd(false).build()));
-  }
-
-  @Test
-  public void closedFileWithDifferentBeginEndIsNotCompletelyReplicated() {
-    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setBegin(0).setEnd(1000).setInfiniteEnd(false).build()));
-  }
-
-  @Test
-  public void closedFileWithInfEndAndNonMaxBeginIsNotCompletelyReplicated() {
-    Assert.assertFalse(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setInfiniteEnd(true).setBegin(10000).build()));
-  }
-
-  @Test
-  public void closedFileWithInfEndAndMaxBeginIsCompletelyReplicated() {
-    Assert.assertTrue(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setInfiniteEnd(true).setBegin(Long.MAX_VALUE).build()));
-  }
-
-  @Test
-  public void closeFileWithEqualBeginEndIsCompletelyReplicated() {
-    Assert.assertTrue(StatusUtil.isSafeForRemoval(Status.newBuilder().setClosed(true).setEnd(100000).setBegin(100000).build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/core/src/test/java/org/apache/accumulo/core/replication/proto/StatusTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/proto/StatusTest.java b/core/src/test/java/org/apache/accumulo/core/replication/proto/StatusTest.java
deleted file mode 100644
index d59a04f..0000000
--- a/core/src/test/java/org/apache/accumulo/core/replication/proto/StatusTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication.proto;
-
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- *
- */
-public class StatusTest {
-
-  @Test
-  public void equality() {
-    Status replicated = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
-    Status unreplicated = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
-
-    Assert.assertFalse(replicated.equals(unreplicated));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/pom.xml
----------------------------------------------------------------------
diff --git a/server/base/pom.xml b/server/base/pom.xml
index fbface8..94850b9 100644
--- a/server/base/pom.xml
+++ b/server/base/pom.xml
@@ -41,6 +41,10 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-codec</groupId>
       <artifactId>commons-codec</artifactId>
     </dependency>
@@ -124,4 +128,29 @@
       </testResource>
     </testResources>
   </build>
+  <profiles>
+    <profile>
+      <id>protobuf</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>generate-protobuf</id>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+                <phase>generate-sources</phase>
+                <configuration>
+                  <executable>${basedir}/src/main/scripts/generate-protobuf.sh</executable>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 1dcc6aa..36938b7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -29,14 +29,14 @@ import java.util.TreeMap;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.server.zookeeper.ZooLock;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 495e0e0..aad6fc2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -88,6 +88,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityUtil;
@@ -207,7 +208,7 @@ public class Initialize implements KeywordExecutable {
     }
     initialReplicationTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), Joiner.on(",").join(ReplicationTable.LOCALITY_GROUPS.keySet()));
     // add formatter to replication table
-    initialReplicationTableConf.put(Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
+    initialReplicationTableConf.put(Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationUtil.STATUS_FORMATTER_CLASS_NAME);
   }
 
   static boolean checkInit(Configuration conf, VolumeManager fs, SiteConfiguration sconf) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/PrintReplicationRecords.java b/server/base/src/main/java/org/apache/accumulo/server/replication/PrintReplicationRecords.java
new file mode 100644
index 0000000..2e28f60
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/PrintReplicationRecords.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import java.io.PrintStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ *
+ */
+public class PrintReplicationRecords implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(PrintReplicationRecords.class);
+
+  private Connector conn;
+  private PrintStream out;
+  private SimpleDateFormat sdf;
+
+  public PrintReplicationRecords(Connector conn, PrintStream out) {
+    this.conn = conn;
+    this.out = out;
+    this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+  }
+
+  @Override
+  public void run() {
+    Scanner s;
+
+    out.println(sdf.format(new Date()) + " Replication entries from metadata table");
+    out.println("------------------------------------------------------------------");
+    try {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    } catch (TableNotFoundException e) {
+      log.error("Metadata table does not exist");
+      return;
+    }
+
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    for (Entry<Key,Value> entry : s) {
+      try {
+        out.println(entry.getKey().toStringNoTruncate() + "=" + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+      } catch (InvalidProtocolBufferException e) {
+        out.println(entry.getKey().toStringNoTruncate() + "= Could not deserialize Status message");
+      }
+    }
+
+    out.println();
+    out.println(sdf.format(new Date()) + " Replication entries from replication table");
+    out.println("--------------------------------------------------------------------");
+
+    try {
+      s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
+    } catch (TableNotFoundException e) {
+      log.error("Replication table does not exist");
+      return;
+    }
+
+    for (Entry<Key,Value> entry : s) {
+      try {
+        out.println(entry.getKey().toStringNoTruncate() + "=" + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+      } catch (InvalidProtocolBufferException e) {
+        out.println(entry.getKey().toStringNoTruncate() + "= Could not deserialize Status message");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystem.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystem.java
new file mode 100644
index 0000000..2ed75b4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystem.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Encapsulation of a remote system which Accumulo can replicate data to
+ */
+public interface ReplicaSystem {
+
+  /**
+   * Replicate the given status to the target peer
+   *
+   * @param p
+   *          Path to the resource we're reading from
+   * @param status
+   *          Information to replicate
+   * @param target
+   *          The peer
+   * @param helper
+   *          Instance of ReplicaSystemHelper
+   * @return A new Status for the progress that was made
+   */
+  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper);
+
+  /**
+   * Configure the implementation with necessary information from the system configuration
+   * <p>
+   * For example, we only need one implementation for Accumulo, but, for each peer, we have a ZK quorum and instance name
+   */
+  public void configure(String configuration);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
new file mode 100644
index 0000000..6d0876b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public class ReplicaSystemFactory {
+  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
+
+  private ReplicaSystemFactory() {}
+
+  /**
+   * @param value
+   *          {@link ReplicaSystem} implementation class name
+   * @return A {@link ReplicaSystem} object from the given name
+   */
+  public static ReplicaSystem get(String value) {
+    Preconditions.checkNotNull(value);
+
+    int index = value.indexOf(',');
+    if (-1 == index) {
+      throw new IllegalArgumentException("Expected comma separator between replication system name and configuration");
+    }
+
+    String name = value.substring(0, index);
+    String configuration = value.substring(index + 1);
+
+    try {
+      Class<?> clz = Class.forName(name);
+
+      if (ReplicaSystem.class.isAssignableFrom(clz)) {
+        Object o = clz.newInstance();
+        ReplicaSystem rs = (ReplicaSystem) o;
+        rs.configure(configuration);
+        return rs;
+      }
+
+      throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name);
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      log.error("Error creating ReplicaSystem object", e);
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Generate the configuration value for a {@link ReplicaSystem} in the instance properties
+   *
+   * @param system
+   *          The desired ReplicaSystem to use
+   * @param configuration
+   *          Configuration string for the desired ReplicaSystem
+   * @return Value to set for peer configuration in the instance
+   */
+  public static String getPeerConfigurationValue(Class<? extends ReplicaSystem> system, String configuration) {
+    String systemName = system.getName() + ",";
+    if (null == configuration) {
+      return systemName;
+    }
+
+    return systemName + configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
new file mode 100644
index 0000000..3d08a3c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicaSystemHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ReplicaSystemHelper {
+  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class);
+
+  private Instance inst;
+  private Credentials creds;
+
+  public ReplicaSystemHelper(Instance inst, Credentials creds) {
+    this.inst = inst;
+    this.creds = creds;
+  }
+
+  /**
+   * Record the updated Status for this file and target
+   *
+   * @param filePath
+   *          Path to file being replicated
+   * @param status
+   *          Updated Status after replication
+   * @param target
+   *          Peer that was replicated to
+   */
+  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+    try {
+      log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString(status));
+      Mutation m = new Mutation(filePath.toString());
+      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
+      bw.addMutation(m);
+    } finally {
+      bw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ded955e9/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index e1bbe3d..c5661a8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -33,8 +33,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.Tables;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -46,11 +44,10 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -62,6 +59,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 
 public class ReplicationUtil {
   private static final Logger log = LoggerFactory.getLogger(ReplicationUtil.class);
+  public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName();
 
   private ZooCache zooCache;
 


Mime
View raw message