accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [06/50] [abbrv] git commit: ACCUMULO-2819 Introduce closedTime and an OrderSection to the replication table.
Date Wed, 21 May 2014 01:59:25 GMT
ACCUMULO-2819 Introduce closedTime and an OrderSection to the replication table.

Need to have global ordering for WALs used by a table. Using this, we can make guarantees
about what data is safe to replicate concurrently, and what data must wait to be replicated.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f97f13ab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f97f13ab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f97f13ab

Branch: refs/heads/ACCUMULO-378
Commit: f97f13ab172169d1adf246ccc84768cf3cb551e3
Parents: 18432fe
Author: Josh Elser <elserj@apache.org>
Authored: Thu May 15 21:03:50 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu May 15 21:03:50 2014 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationSchema.java     | 130 +++++++++-
 .../accumulo/core/replication/StatusUtil.java   |  28 ++-
 .../core/replication/proto/Replication.java     | 250 ++++++++++++++++++-
 core/src/main/protobuf/replication.proto        |   9 +-
 .../apache/accumulo/server/fs/VolumeUtil.java   |   2 +-
 .../server/replication/StatusCombinerTest.java  |   4 +-
 .../server/util/ReplicationTableUtilTest.java   |   2 +-
 .../CloseWriteAheadLogReferences.java           |   2 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |   5 +-
 .../master/replication/StatusMaker.java         |  64 ++++-
 .../accumulo/master/replication/WorkMaker.java  |   1 -
 .../master/replication/StatusMakerTest.java     |  64 +++++
 .../master/replication/WorkMakerTest.java       |  22 +-
 .../org/apache/accumulo/tserver/Tablet.java     |  15 +-
 14 files changed, 535 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 57be449..725758e 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -16,7 +16,10 @@
  */
 package org.apache.accumulo.core.replication;
 
+import java.nio.charset.CharacterCodingException;
+
 import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.lexicoder.ULongLexicoder;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -34,7 +37,7 @@ public class ReplicationSchema {
   /**
    * Portion of a file that must be replication to the given target: peer and some identifying location on that peer, e.g. remote table ID
    * <p>
-   * <code>hdfs://localhost:8020/accumulo/wal/tserver+port/WAL work:serialized_ReplicationTarget [] -> Protobuf</code>
+   * <code>hdfs://localhost:8020/accumulo/wal/tserver+port/WAL work:serialized_ReplicationTarget [] -> Status Protobuf</code>
    */
   public static class WorkSection {
     public static final Text NAME = new Text("work");
@@ -75,14 +78,14 @@ public class ReplicationSchema {
   /**
    * Holds replication markers tracking status for files
    * <p>
-   * <code>hdfs://localhost:8020/accumulo/wal/tserver+port/WAL repl:local_table_id [] -> Protobuf</code>
+   * <code>hdfs://localhost:8020/accumulo/wal/tserver+port/WAL repl:local_table_id [] -> Status Protobuf</code>
    */
   public static class StatusSection {
     public static final Text NAME = new Text("repl");
-    private static final ByteSequence BYTE_SEQ_NAME = new ArrayByteSequence("repl");
+    private static final ByteSequence BYTE_SEQ_NAME = new ArrayByteSequence("repl"); 
 
     /**
-     * Extract the table ID from the colfam (inefficiently if called repeatedly)
+     * Extract the table ID from the key (inefficiently if called repeatedly)
      * @param k Key to extract from
      * @return The table ID
      * @see #getTableId(Key,Text) 
@@ -94,7 +97,7 @@ public class ReplicationSchema {
     }
 
     /**
-     * Extract the table ID from the colfam into the given {@link Text}
+     * Extract the table ID from the key into the given {@link Text}
      * @param k Key to extract from
      * @param buff Text to place table ID into
      */
@@ -119,7 +122,7 @@ public class ReplicationSchema {
     }
 
     /**
-     * Limit the scanner to only return ingest records
+     * Limit the scanner to only return Status records
      * @param scanner
      */
     public static void limit(ScannerBase scanner) {
@@ -132,6 +135,121 @@ public class ReplicationSchema {
     }
   }
 
+  /**
+   * Holds the order in which files needed for replication were closed. The intent is to be able to guarantee
+   * that files which were closed earlier were replicated first and we don't replay data in the wrong order on our peers
+   * <p>
+   * <code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
+   */
+  public static class OrderSection {
+    public static final Text NAME = new Text("order");
+    public static final String ROW_SEPARATOR = "_";
+    private static final ULongLexicoder longEncoder = new ULongLexicoder();
+
+    /**
+     * Extract the table ID from the given key (inefficiently if called repeatedly)
+     * @param k OrderSection Key
+     * @return source table id
+     */
+    public static String getTableId(Key k) {
+      Text buff = new Text();
+      getTableId(k, buff);
+      return buff.toString();
+    }
+
+    /**
+     * Extract the table ID from the given key
+     * @param k OrderSection key
+     * @param buff Text to place table ID into
+     */
+    public static void getTableId(Key k, Text buff) {
+      Preconditions.checkNotNull(k);
+      Preconditions.checkNotNull(buff);
+
+      k.getColumnQualifier(buff);
+    }
+
+    /**
+     * Limit the scanner to only return Order records
+     * @param scanner
+     */
+    public static void limit(ScannerBase scanner) {
+      scanner.fetchColumnFamily(NAME);
+    }
+
+    /**
+     * Creates the Mutation for the Order section for the given file and time, adding the column
+     * as well using {@link OrderSection#add(Mutation, Text, Value)}
+     * @param file Filename
+     * @param timeInMillis Time in millis that the file was closed
+     * @param tableId Source table id
+     * @param v Serialized Status msg as a Value
+     * @return Mutation for the Order section
+     */
+    public static Mutation createMutation(String file, long timeInMillis, Text tableId, Value v) {
+      Preconditions.checkNotNull(file);
+      Preconditions.checkArgument(timeInMillis >= 0, "timeInMillis must be greater than zero");
+      Preconditions.checkNotNull(v);
+
+      // Encode the time so it sorts properly
+      byte[] rowPrefix = longEncoder.encode(timeInMillis);
+      Text row = new Text(rowPrefix);
+      // Append the file as a suffix to the row
+      row.append((ROW_SEPARATOR+file).getBytes(), 0, file.length() + ROW_SEPARATOR.length());
+
+      // Make the mutation and add the column update
+      Mutation m = new Mutation(row);
+      return add(m, tableId, v);
+    }
+
+    /**
+     * Add a column update to the given mutation with the provided tableId and value
+     * @param m Mutation for OrderSection
+     * @param tableId Source table id
+     * @param v Serialized Status msg
+     * @return The original Mutation
+     */
+    public static Mutation add(Mutation m, Text tableId, Value v) {
+      m.put(NAME, tableId, v);
+      return m;
+    }
+
+    public static long getTimeClosed(Key k) {
+      return getTimeClosed(k, new Text());
+    }
+
+    public static long getTimeClosed(Key k, Text buff) {
+      k.getRow(buff);
+      int offset = buff.find(ROW_SEPARATOR);
+      if (-1 == offset) {
+        throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
+      }
+
+      byte[] encodedLong = new byte[offset];
+      System.arraycopy(buff.getBytes(), 0, encodedLong, 0, offset);
+      return longEncoder.decode(encodedLong);
+    }
+
+    public static String getFile(Key k) {
+      Text buff = new Text();
+      return getFile(k, buff);
+    }
+
+    public static String getFile(Key k, Text buff) {
+      k.getRow(buff);
+      int offset = buff.find(ROW_SEPARATOR);
+      if (-1 == offset) {
+        throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
+      }
+
+      try {
+        return Text.decode(buff.getBytes(), offset + 1, buff.getLength() - (offset + 1));
+      } catch (CharacterCodingException e) {
+        throw new IllegalArgumentException("Could not decode file path", e);
+      }
+    }
+  }
+
   private static void _getFile(Key k, Text buff) {
     k.getRow(buff);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index 842d945..94c60ab 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -28,8 +28,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
  */
 public class StatusUtil {
 
-  private static final Status NEW_REPLICATION_STATUS, CLOSED_REPLICATION_STATUS, INF_END_REPLICATION_STATUS;
-  private static final Value NEW_REPLICATION_STATUS_VALUE, CLOSED_REPLICATION_STATUS_VALUE, INF_END_REPLICATION_STATUS_VALUE;
+  private static final Status NEW_REPLICATION_STATUS, INF_END_REPLICATION_STATUS;
+  private static final Value NEW_REPLICATION_STATUS_VALUE, INF_END_REPLICATION_STATUS_VALUE;
+
+  private static final Status.Builder CLOSED_STATUS_BUILDER;
 
   static {
     Status.Builder builder = Status.newBuilder();
@@ -40,13 +42,11 @@ public class StatusUtil {
     NEW_REPLICATION_STATUS = builder.build();
     NEW_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(NEW_REPLICATION_STATUS);
 
-    builder = Status.newBuilder();
-    builder.setBegin(0);
-    builder.setEnd(0);
-    builder.setInfiniteEnd(true);
-    builder.setClosed(true);
-    CLOSED_REPLICATION_STATUS = builder.build();
-    CLOSED_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_REPLICATION_STATUS);
+    CLOSED_STATUS_BUILDER = Status.newBuilder();
+    CLOSED_STATUS_BUILDER.setBegin(0);
+    CLOSED_STATUS_BUILDER.setEnd(0);
+    CLOSED_STATUS_BUILDER.setInfiniteEnd(true);
+    CLOSED_STATUS_BUILDER.setClosed(true);
 
     builder = Status.newBuilder();
     builder.setBegin(0);
@@ -137,15 +137,17 @@ public class StatusUtil {
   /**
    * @return A {@link Status} for a closed file of unspecified length, all of which needs replicating.
    */
-  public static Status fileClosed() {
-    return CLOSED_REPLICATION_STATUS;
+  public static synchronized Status fileClosed(long timeClosed) {
+    // We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable)
+    CLOSED_STATUS_BUILDER.setClosedTime(timeClosed);
+    return CLOSED_STATUS_BUILDER.build();
   }
 
   /**
    * @return A {@link Value} for a closed file of unspecified length, all of which needs replicating.
    */
-  public static Value fileClosedValue() {
-    return CLOSED_REPLICATION_STATUS_VALUE;
+  public static Value fileClosedValue(long timeClosed) {
+    return ProtobufUtil.toValue(fileClosed(timeClosed));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java b/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
index a7e80f8..d301028 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
@@ -30,42 +30,92 @@ package org.apache.accumulo.core.replication.proto;
     // optional int64 begin = 1 [default = 0];
     /**
      * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
      */
     boolean hasBegin();
     /**
      * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
      */
     long getBegin();
 
     // optional int64 end = 2 [default = 0];
     /**
      * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
      */
     boolean hasEnd();
     /**
      * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
      */
     long getEnd();
 
     // optional bool infiniteEnd = 3 [default = false];
     /**
      * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
      */
     boolean hasInfiniteEnd();
     /**
      * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
      */
     boolean getInfiniteEnd();
 
     // optional bool closed = 4 [default = false];
     /**
      * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
      */
     boolean hasClosed();
     /**
      * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
      */
     boolean getClosed();
+
+    // optional int64 closedTime = 5 [default = 0];
+    /**
+     * <code>optional int64 closedTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file closed?
+     * </pre>
+     */
+    boolean hasClosedTime();
+    /**
+     * <code>optional int64 closedTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file closed?
+     * </pre>
+     */
+    long getClosedTime();
   }
   /**
    * Protobuf type {@code Status}
@@ -138,6 +188,11 @@ package org.apache.accumulo.core.replication.proto;
               closed_ = input.readBool();
               break;
             }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              closedTime_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -183,12 +238,20 @@ package org.apache.accumulo.core.replication.proto;
     private long begin_;
     /**
      * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
      */
     public boolean hasBegin() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
      * <code>optional int64 begin = 1 [default = 0];</code>
+     *
+     * <pre>
+     * offset where replication should start
+     * </pre>
      */
     public long getBegin() {
       return begin_;
@@ -199,12 +262,20 @@ package org.apache.accumulo.core.replication.proto;
     private long end_;
     /**
      * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
      */
     public boolean hasEnd() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
      * <code>optional int64 end = 2 [default = 0];</code>
+     *
+     * <pre>
+     * offset where data is ready for replication
+     * </pre>
      */
     public long getEnd() {
       return end_;
@@ -215,12 +286,20 @@ package org.apache.accumulo.core.replication.proto;
     private boolean infiniteEnd_;
     /**
      * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
      */
     public boolean hasInfiniteEnd() {
       return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     /**
      * <code>optional bool infiniteEnd = 3 [default = false];</code>
+     *
+     * <pre>
+     * do we have a discrete 'end'
+     * </pre>
      */
     public boolean getInfiniteEnd() {
       return infiniteEnd_;
@@ -231,22 +310,55 @@ package org.apache.accumulo.core.replication.proto;
     private boolean closed_;
     /**
      * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
      */
     public boolean hasClosed() {
       return ((bitField0_ & 0x00000008) == 0x00000008);
     }
     /**
      * <code>optional bool closed = 4 [default = false];</code>
+     *
+     * <pre>
+     * will more data be appended to the file
+     * </pre>
      */
     public boolean getClosed() {
       return closed_;
     }
 
+    // optional int64 closedTime = 5 [default = 0];
+    public static final int CLOSEDTIME_FIELD_NUMBER = 5;
+    private long closedTime_;
+    /**
+     * <code>optional int64 closedTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file closed?
+     * </pre>
+     */
+    public boolean hasClosedTime() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int64 closedTime = 5 [default = 0];</code>
+     *
+     * <pre>
+     * when, in ms, was the file closed?
+     * </pre>
+     */
+    public long getClosedTime() {
+      return closedTime_;
+    }
+
     private void initFields() {
       begin_ = 0L;
       end_ = 0L;
       infiniteEnd_ = false;
       closed_ = false;
+      closedTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -272,6 +384,9 @@ package org.apache.accumulo.core.replication.proto;
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeBool(4, closed_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(5, closedTime_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -297,6 +412,10 @@ package org.apache.accumulo.core.replication.proto;
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(4, closed_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(5, closedTime_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -421,6 +540,8 @@ package org.apache.accumulo.core.replication.proto;
         bitField0_ = (bitField0_ & ~0x00000004);
         closed_ = false;
         bitField0_ = (bitField0_ & ~0x00000008);
+        closedTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -465,6 +586,10 @@ package org.apache.accumulo.core.replication.proto;
           to_bitField0_ |= 0x00000008;
         }
         result.closed_ = closed_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.closedTime_ = closedTime_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -493,6 +618,9 @@ package org.apache.accumulo.core.replication.proto;
         if (other.hasClosed()) {
           setClosed(other.getClosed());
         }
+        if (other.hasClosedTime()) {
+          setClosedTime(other.getClosedTime());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -524,18 +652,30 @@ package org.apache.accumulo.core.replication.proto;
       private long begin_ ;
       /**
        * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
        */
       public boolean hasBegin() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
        * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
        */
       public long getBegin() {
         return begin_;
       }
       /**
        * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
        */
       public Builder setBegin(long value) {
         bitField0_ |= 0x00000001;
@@ -545,6 +685,10 @@ package org.apache.accumulo.core.replication.proto;
       }
       /**
        * <code>optional int64 begin = 1 [default = 0];</code>
+       *
+       * <pre>
+       * offset where replication should start
+       * </pre>
        */
       public Builder clearBegin() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -557,18 +701,30 @@ package org.apache.accumulo.core.replication.proto;
       private long end_ ;
       /**
        * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
        */
       public boolean hasEnd() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       /**
        * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
        */
       public long getEnd() {
         return end_;
       }
       /**
        * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
        */
       public Builder setEnd(long value) {
         bitField0_ |= 0x00000002;
@@ -578,6 +734,10 @@ package org.apache.accumulo.core.replication.proto;
       }
       /**
        * <code>optional int64 end = 2 [default = 0];</code>
+       *
+       * <pre>
+       * offset where data is ready for replication
+       * </pre>
        */
       public Builder clearEnd() {
         bitField0_ = (bitField0_ & ~0x00000002);
@@ -590,18 +750,30 @@ package org.apache.accumulo.core.replication.proto;
       private boolean infiniteEnd_ ;
       /**
        * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
        */
       public boolean hasInfiniteEnd() {
         return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       /**
        * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
        */
       public boolean getInfiniteEnd() {
         return infiniteEnd_;
       }
       /**
        * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
        */
       public Builder setInfiniteEnd(boolean value) {
         bitField0_ |= 0x00000004;
@@ -611,6 +783,10 @@ package org.apache.accumulo.core.replication.proto;
       }
       /**
        * <code>optional bool infiniteEnd = 3 [default = false];</code>
+       *
+       * <pre>
+       * do we have a discrete 'end'
+       * </pre>
        */
       public Builder clearInfiniteEnd() {
         bitField0_ = (bitField0_ & ~0x00000004);
@@ -623,18 +799,30 @@ package org.apache.accumulo.core.replication.proto;
       private boolean closed_ ;
       /**
        * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
        */
       public boolean hasClosed() {
         return ((bitField0_ & 0x00000008) == 0x00000008);
       }
       /**
        * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
        */
       public boolean getClosed() {
         return closed_;
       }
       /**
        * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
        */
       public Builder setClosed(boolean value) {
         bitField0_ |= 0x00000008;
@@ -644,6 +832,10 @@ package org.apache.accumulo.core.replication.proto;
       }
       /**
        * <code>optional bool closed = 4 [default = false];</code>
+       *
+       * <pre>
+       * will more data be appended to the file
+       * </pre>
        */
       public Builder clearClosed() {
         bitField0_ = (bitField0_ & ~0x00000008);
@@ -652,6 +844,55 @@ package org.apache.accumulo.core.replication.proto;
         return this;
       }
 
+      // optional int64 closedTime = 5 [default = 0];
+      private long closedTime_ ;
+      /**
+       * <code>optional int64 closedTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file closed?
+       * </pre>
+       */
+      public boolean hasClosedTime() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional int64 closedTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file closed?
+       * </pre>
+       */
+      public long getClosedTime() {
+        return closedTime_;
+      }
+      /**
+       * <code>optional int64 closedTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file closed?
+       * </pre>
+       */
+      public Builder setClosedTime(long value) {
+        bitField0_ |= 0x00000010;
+        closedTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 closedTime = 5 [default = 0];</code>
+       *
+       * <pre>
+       * when, in ms, was the file closed?
+       * </pre>
+       */
+      public Builder clearClosedTime() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        closedTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:Status)
     }
 
@@ -677,11 +918,12 @@ package org.apache.accumulo.core.replication.proto;
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n#src/main/protobuf/replication.proto\"]\n" +
+      "\n#src/main/protobuf/replication.proto\"t\n" +
       "\006Status\022\020\n\005begin\030\001 \001(\003:\0010\022\016\n\003end\030\002 \001(\003:\001" +
       "0\022\032\n\013infiniteEnd\030\003 \001(\010:\005false\022\025\n\006closed\030" +
-      "\004 \001(\010:\005falseB.\n*org.apache.accumulo.core" +
-      ".replication.protoH\001"
+      "\004 \001(\010:\005false\022\025\n\nclosedTime\030\005 \001(\003:\0010B.\n*o" +
+      "rg.apache.accumulo.core.replication.prot" +
+      "oH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -693,7 +935,7 @@ package org.apache.accumulo.core.replication.proto;
           internal_static_Status_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Status_descriptor,
-              new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", });
+              new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", "ClosedTime", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/core/src/main/protobuf/replication.proto
----------------------------------------------------------------------
diff --git a/core/src/main/protobuf/replication.proto b/core/src/main/protobuf/replication.proto
index bdcda61..be801b0 100644
--- a/core/src/main/protobuf/replication.proto
+++ b/core/src/main/protobuf/replication.proto
@@ -18,8 +18,9 @@ option java_package = "org.apache.accumulo.core.replication.proto";
 option optimize_for = SPEED;
 
 message Status {
-	optional int64 begin = 1 [default = 0];
-	optional int64 end = 2 [default = 0];
-	optional bool infiniteEnd = 3 [default = false];
-	optional bool closed = 4 [default = false];
+	optional int64 begin = 1 [default = 0]; // offset where replication should start
+	optional int64 end = 2 [default = 0]; // offset where data is ready for replication
+	optional bool infiniteEnd = 3 [default = false]; // do we have a discrete 'end'
+	optional bool closed = 4 [default = false]; // will more data be appended to the file
+	optional int64 closedTime = 5 [default = 0]; // when, in ms, was the file closed?
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index f83fcbe..400156c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -232,7 +232,7 @@ public class VolumeUtil {
       MetadataTableUtil.updateTabletVolumes(extent, logsToRemove, logsToAdd, filesToRemove, filesToAdd, switchedDir, zooLock, creds);
       if (replicate) {
         // Before deleting these logs, we need to mark them for replication
-        ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, StatusUtil.fileClosed());
+        ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, StatusUtil.fileClosed(System.currentTimeMillis()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index 0a5f5f5..8b8b72c 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -146,7 +146,7 @@ public class StatusCombinerTest {
 
   @Test
   public void commutativeWithClose() {
-    Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(), secondSync = StatusUtil.ingestedUntil(200);
+    Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(System.currentTimeMillis()), secondSync = StatusUtil.ingestedUntil(200);
 
     Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, closed, secondSync).iterator()), order2 = combiner.typedReduce(key,
         Arrays.asList(newFile, secondSync, closed).iterator());
@@ -156,7 +156,7 @@ public class StatusCombinerTest {
 
   @Test
   public void commutativeWithCloseSingleBuilder() {
-    Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(), secondSync = StatusUtil.ingestedUntil(builder, 200);
+    Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(System.currentTimeMillis()), secondSync = StatusUtil.ingestedUntil(builder, 200);
 
     Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, closed, secondSync).iterator()), order2 = combiner.typedReduce(key,
         Arrays.asList(newFile, secondSync, closed).iterator());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index d1a43c3..be8e7ed 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -107,7 +107,7 @@ public class ReplicationTableUtilTest {
   @Test
   public void replEntryMutation() {
     // We stopped using a WAL -- we need a reference that this WAL needs to be replicated completely
-    Status stat = StatusUtil.fileClosed();
+    Status stat = StatusUtil.fileClosed(System.currentTimeMillis());
     String file = "file:///accumulo/wal/127.0.0.1+9997" + UUID.randomUUID();
     Path filePath = new Path(file);
     Text row = new Text(filePath.toString());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index d649c3e..294883e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -250,7 +250,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
   protected void closeWal(BatchWriter bw, Key k) throws MutationsRejectedException {
     log.debug("Closing unreferenced WAL ({}) in metadata table", k.toStringNoTruncate());
     Mutation m = new Mutation(k.getRow());
-    m.put(k.getColumnFamily(), k.getColumnQualifier(), StatusUtil.fileClosedValue());
+    m.put(k.getColumnFamily(), k.getColumnQualifier(), StatusUtil.fileClosedValue(System.currentTimeMillis()));
     bw.addMutation(m);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 1a3995b..2faa8a2 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -29,13 +29,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -341,7 +338,7 @@ public class GarbageCollectWriteAheadLogsTest {
     assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
 
     // The file is closed but not replicated, must be retained
-    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
+    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue(System.currentTimeMillis())));
     assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
 
     // File is closed and fully replicated, can be deleted

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index e44f184..8941a56 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -104,28 +105,39 @@ public class StatusMaker {
         MetadataSchema.ReplicationSection.getFile(entry.getKey(), row);
         MetadataSchema.ReplicationSection.getTableId(entry.getKey(), tableId);
 
-        String rowStr = row.toString();
-        rowStr = rowStr.substring(ReplicationSection.getRowPrefix().length());
+        String file = row.toString();
+        file = file.substring(ReplicationSection.getRowPrefix().length());
 
         Status status;
         try {
           status = Status.parseFrom(entry.getValue().get());
         } catch (InvalidProtocolBufferException e) {
-          log.warn("Could not deserialize protobuf for {}", rowStr);
+          log.warn("Could not deserialize protobuf for {}", file);
           continue;
         }
 
-        log.debug("Creating replication status record for {} on table {} with {}.", rowStr, tableId, ProtobufUtil.toString(status));
+        log.debug("Creating replication status record for {} on table {} with {}.", file, tableId, ProtobufUtil.toString(status));
 
         Span workSpan = Trace.start("createStatusMutations");
         try {
           // Create entries in the replication table from the metadata table
-          addStatusRecord(rowStr, tableId, entry.getValue());
+          if (!addStatusRecord(file, tableId, entry.getValue())) {
+            continue;
+          }
         } finally {
           workSpan.stop();
         }
 
         if (status.getClosed()) {
+          Span orderSpan = Trace.start("recordStatusOrder");
+          try {
+            if (!addOrderRecord(file, tableId, status, entry.getValue())) {
+              continue;
+            }
+          } finally {
+            orderSpan.stop();
+          }
+
           Span deleteSpan = Trace.start("deleteClosedStatus");
           try {
             deleteStatusRecord(entry.getKey());
@@ -149,8 +161,7 @@ public class StatusMaker {
    * @param tableId
    * @param v
    */
-  protected void addStatusRecord(String file, Text tableId, Value v) {
-    // TODO come up with something that tries to avoid creating a new BatchWriter all the time
+  protected boolean addStatusRecord(String file, Text tableId, Value v) {
     try {
       Mutation m = new Mutation(file);
       m.put(StatusSection.NAME, tableId, v);
@@ -159,14 +170,53 @@ public class StatusMaker {
         replicationWriter.addMutation(m);
       } catch (MutationsRejectedException e) {
         log.warn("Failed to write work mutations for replication, will retry", e);
+        return false;
       }
     } finally {
       try {
         replicationWriter.flush();
       } catch (MutationsRejectedException e) {
         log.warn("Failed to write work mutations for replication, will retry", e);
+        return false;
       }
     }
+
+    return true;
+  }
+
+  /**
+   * Create a record to track when the file was closed to ensure that replication preference
+   * is given to files that have been closed the longest and allow the work assigner to try to
+   * replicate in order that data was ingested (avoid replay in different order)
+   * @param file File being replicated
+   * @param tableId Table ID the file was used by
+   * @param stat Status msg
+   * @param value Serialized version of the Status msg
+   */
+  protected boolean addOrderRecord(String file, Text tableId, Status stat, Value value) {
+    try {
+      if (!stat.hasClosedTime()) {
+        log.warn("Status record ({}) for {} in table {} was written to metadata table which was closed but lacked closedTime", ProtobufUtil.toString(stat), file, tableId);
+      }
+
+      Mutation m = OrderSection.createMutation(file, stat.getClosedTime(), tableId, value);
+
+      try {
+        replicationWriter.addMutation(m);
+      } catch (MutationsRejectedException e) {
+        log.warn("Failed to write order mutation for replication, will retry", e);
+        return false;
+      }
+    } finally {
+      try {
+        replicationWriter.flush();
+      } catch (MutationsRejectedException e) {
+        log.warn("Failed to write order mutation for replication, will retry", e);
+        return false;
+      }
+    }
+
+    return true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 1aaae51..2dfddc2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -156,7 +156,6 @@ public class WorkMaker {
   }
 
   protected void addWorkRecord(Text file, Value v, Map<String,String> targets, String sourceTableId) {
-    // TODO come up with something that tries to avoid creating a new BatchWriter all the time
     log.info("Adding work records for " + file + " to targets " + targets);
     try {
       Mutation m = new Mutation(file);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
index 496318d..e0fc421 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.accumulo.master.replication;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -33,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -187,4 +191,64 @@ public class StatusMakerTest {
     
   }
 
+  @Test
+  public void closedMessagesCreateOrderRecords() throws Exception {
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    String sourceTable = "source";
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Integer> fileToTableId = new HashMap<>();
+
+    Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+    int index = 1;
+    long time = System.currentTimeMillis();
+    for (String file : files) {
+      statBuilder.setClosedTime(time++);
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+      index++;
+    }
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    OrderSection.limit(s);
+    Iterator<Entry<Key,Value>> iter = s.iterator();
+    Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+    Iterator<String> expectedFiles = files.iterator();
+    Text buff = new Text();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry  = iter.next();
+
+      Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+      OrderSection.getTableId(entry.getKey(), buff);
+      Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index cd313f9..337aa12 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.master.replication;
 
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -84,8 +83,10 @@ public class WorkMakerTest {
     String tableId = conn.tableOperations().tableIdMap().get(table);
     String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
 
+    // Create a status record for a file
+    long timeClosed = System.currentTimeMillis();
     Mutation m = new Mutation(new Path(file).toString());
-    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue());
+    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue(timeClosed));
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     bw.addMutation(m);
     bw.flush();
@@ -97,26 +98,23 @@ public class WorkMakerTest {
 
     WorkMaker workMaker = new WorkMaker(conn);
 
+    // Invoke the addWorkRecord method to create a Work record from the Status record earlier
     ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4", tableId);
     workMaker.setBatchWriter(bw);
-    workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), ImmutableMap.of("remote_cluster_1", "4"), tableId);
+    workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(timeClosed), ImmutableMap.of("remote_cluster_1", "4"), tableId);
 
+    // Scan over just the WorkSection
     s = ReplicationTable.getScanner(conn);
     WorkSection.limit(s);
 
-    Iterator<Entry<Key,Value>> iter = s.iterator();
-    Assert.assertTrue(iter.hasNext());
-
-    Entry<Key,Value> workEntry = iter.next();
+    Entry<Key,Value> workEntry = Iterables.getOnlyElement(s);
     Key workKey = workEntry.getKey();
     ReplicationTarget actual = ReplicationTarget.from(workKey.getColumnQualifier());
 
     Assert.assertEquals(file, workKey.getRow().toString());
     Assert.assertEquals(WorkSection.NAME, workKey.getColumnFamily());
     Assert.assertEquals(expected, actual);
-    Assert.assertEquals(workEntry.getValue(), StatusUtil.fileClosedValue());
-
-    Assert.assertFalse(iter.hasNext());
+    Assert.assertEquals(workEntry.getValue(), StatusUtil.fileClosedValue(timeClosed));
   }
 
   @Test
@@ -129,7 +127,7 @@ public class WorkMakerTest {
     String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
 
     Mutation m = new Mutation(new Path(file).toString());
-    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue());
+    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue(System.currentTimeMillis()));
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     bw.addMutation(m);
     bw.flush();
@@ -147,7 +145,7 @@ public class WorkMakerTest {
       expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId));
     }
     workMaker.setBatchWriter(bw);
-    workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), targetClusters, tableId);
+    workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(System.currentTimeMillis()), targetClusters, tableId);
 
     s = ReplicationTable.getScanner(conn);
     WorkSection.limit(s);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f97f13ab/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 43c6c34..2b9c326 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1378,13 +1378,6 @@ public class Tablet {
         for (FileRef ref : datafiles.keySet())
           absPaths.add(ref.path().toString());
 
-        // Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
-        if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
-          for (LogEntry logEntry : logEntries) {
-            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, StatusUtil.fileClosed());
-          }
-        }
-
         tabletServer.recover(this.tabletServer.getFileSystem(), extent, acuTableConf, logEntries, absPaths, new MutationReceiver() {
           @Override
           public void receive(Mutation m) {
@@ -1407,6 +1400,14 @@ public class Tablet {
         }
         commitSession.updateMaxCommittedTime(tabletTime.getTime());
 
+        // Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
+        if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
+          long timeClosed = System.currentTimeMillis();
+          for (LogEntry logEntry : logEntries) {
+            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, StatusUtil.fileClosed(timeClosed));
+          }
+        }
+
         if (count[0] == 0) {
           MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
           logEntries.clear();


Mime
View raw message