hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject git commit: HBase-11315: Keeping MVCC for configurable longer time
Date Mon, 07 Jul 2014 01:42:20 GMT
Repository: hbase
Updated Branches:
  refs/heads/master a04e0b703 -> d07bc87cd


HBase-11315: Keeping MVCC for configurable longer time


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d07bc87c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d07bc87c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d07bc87c

Branch: refs/heads/master
Commit: d07bc87cd656b510d7ac610a41bc4195282a006b
Parents: a04e0b7
Author: Jeffrey Zhong <jeffreyz@apache.org>
Authored: Sun Jul 6 18:25:18 2014 -0700
Committer: Jeffrey Zhong <jeffreyz@apache.org>
Committed: Sun Jul 6 18:25:18 2014 -0700

----------------------------------------------------------------------
 .../ipc/TestPayloadCarryingRpcController.java   |   6 +
 .../main/java/org/apache/hadoop/hbase/Cell.java |  11 ++
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   4 +-
 .../org/apache/hadoop/hbase/HConstants.java     |   5 +
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  24 ++--
 .../org/apache/hadoop/hbase/KeyValueUtil.java   |   4 +-
 .../io/encoding/BufferedDataBlockEncoder.java   |  10 ++
 .../hbase/io/encoding/EncodedDataBlock.java     |   4 +-
 .../hadoop/hbase/codec/TestCellCodec.java       |   2 +-
 .../util/TestByteRangeWithKVSerialization.java  |   4 +-
 .../codec/prefixtree/decode/PrefixTreeCell.java |   5 +
 .../data/TestRowDataDifferentTimestamps.java    |  10 +-
 .../hbase/protobuf/generated/WALProtos.java     | 116 ++++++++++++++++---
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../hadoop/hbase/io/hfile/HFileReaderV2.java    |   2 +-
 .../hbase/protobuf/ReplicationProtbufUtil.java  |   3 +
 .../hbase/regionserver/DefaultMemStore.java     |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  57 +++++++--
 .../MultiVersionConsistencyControl.java         |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  28 +++--
 .../hbase/regionserver/StoreFileScanner.java    |   9 --
 .../regionserver/compactions/Compactor.java     |  31 +++--
 .../compactions/DefaultCompactor.java           |   7 +-
 .../compactions/StripeCompactor.java            |   7 +-
 .../hbase/regionserver/wal/FSWALEntry.java      |   4 +-
 .../hadoop/hbase/regionserver/wal/HLogKey.java  |  23 ++++
 .../hbase/regionserver/wal/HLogSplitter.java    |  20 ++--
 .../hbase/security/access/AccessController.java |   2 +-
 .../visibility/VisibilityController.java        |   2 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |   2 +-
 .../hadoop/hbase/io/hfile/TestHFileBlock.java   |   2 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  26 ++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 +
 .../regionserver/TestReversibleScanners.java    |   2 +-
 .../regionserver/TestSeekOptimizations.java     |  28 ++++-
 35 files changed, 359 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
index 249cc42..088e609 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
@@ -144,6 +144,12 @@ public class TestPayloadCarryingRpcController {
               }
 
               @Override
+              public long getSequenceId() {
+                // unused
+                return 0;
+              }
+
+              @Override
               public byte[] getValueArray() {
                 return Bytes.toBytes(this.i);
               }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
index 27b9345..f7e9272 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
@@ -142,14 +142,25 @@ public interface Cell {
   //6) MvccVersion
 
   /**
+   * @deprecated as of 1.0, use {@link Cell#getSequenceId()}
+   * 
    * Internal use only. A region-specific sequence ID given to each operation. It always exists for
    * cells in the memstore but is not retained forever. It may survive several flushes, but
    * generally becomes irrelevant after the cell's row is no longer involved in any operations that
    * require strict consistency.
    * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
    */
+  @Deprecated
   long getMvccVersion();
 
+  /**
+   * A region-specific unique monotonically increasing sequence ID given to each Cell. It always
+   * exists for cells in the memstore but is not retained forever. It will be kept for
+   * {@link HConstants#KEEP_SEQID_PERIOD} days, but generally becomes irrelevant after the cell's
+   * row is no longer involved in any operations that require strict consistency.
+   * @return seqId (always > 0 if exists), or 0 if it no longer exists
+   */
+  long getSequenceId();
 
   //7) Value
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 376e073..d6564c2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -167,7 +167,7 @@ public final class CellUtil {
       final long timestamp, final byte type, final byte[] value, final long memstoreTS) {
     KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
         KeyValue.Type.codeToType(type), value);
-    keyValue.setMvccVersion(memstoreTS);
+    keyValue.setSequenceId(memstoreTS);
     return keyValue;
   }
 
@@ -175,7 +175,7 @@ public final class CellUtil {
       final long timestamp, final byte type, final byte[] value, byte[] tags, final long memstoreTS) {
     KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
         KeyValue.Type.codeToType(type), value, tags);
-    keyValue.setMvccVersion(memstoreTS);
+    keyValue.setSequenceId(memstoreTS);
     return keyValue;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c2709f5..93209fd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -353,6 +353,11 @@ public final class HConstants {
 
   /** Default value for cluster ID */
   public static final String CLUSTER_ID_DEFAULT = "default-cluster";
+  
+  /** Parameter name for # days to keep MVCC values during a major compaction */
+  public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period";
+  /** At least to keep MVCC values in hfiles for 5 days */
+  public static final int MIN_KEEP_SEQID_PERIOD = 5;
 
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 887946e..002642a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -284,15 +284,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
   // used to achieve atomic operations in the memstore.
   @Override
   public long getMvccVersion() {
-    return mvcc;
+    return this.getSequenceId();
   }
 
-  public void setMvccVersion(long mvccVersion){
-    this.mvcc = mvccVersion;
+  /**
+   * used to achieve atomic operations in the memstore.
+   */
+  @Override
+  public long getSequenceId() {
+    return seqId;
+  }
+
+  public void setSequenceId(long seqId) {
+    this.seqId = seqId;
   }
 
   // multi-version concurrency control version.  default value is 0, aka do not care.
-  private long mvcc = 0;  // this value is not part of a serialized KeyValue (not in HFiles)
+  private long seqId = 0;
 
   /** Dragon time over, return to normal business */
 
@@ -1083,7 +1091,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
     // Important to clone the memstoreTS as well - otherwise memstore's
     // update-in-place methods (eg increment) will end up creating
     // new entries
-    ret.setMvccVersion(mvcc);
+    ret.setSequenceId(seqId);
     return ret;
   }
 
@@ -1094,7 +1102,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
    */
   public KeyValue shallowCopy() {
     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
-    shallowCopy.setMvccVersion(this.mvcc);
+    shallowCopy.setSequenceId(this.seqId);
     return shallowCopy;
   }
 
@@ -1108,8 +1116,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
     if (this.bytes == null || this.bytes.length == 0) {
       return "empty";
     }
-    return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
-      "/vlen=" + getValueLength() + "/mvcc=" + mvcc;
+    return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
+      + getValueLength() + "/seqid=" + seqId;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index 4c1f345..c2a8826 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -75,7 +75,7 @@ public class KeyValueUtil {
   public static KeyValue copyToNewKeyValue(final Cell cell) {
     byte[] bytes = copyToNewByteArray(cell);
     KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
-    kvCell.setMvccVersion(cell.getMvccVersion());
+    kvCell.setSequenceId(cell.getMvccVersion());
     return kvCell;
   }
 
@@ -175,7 +175,7 @@ public class KeyValueUtil {
     keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
     if (includesMvccVersion) {
       long mvccVersion = ByteBufferUtils.readVLong(bb);
-      keyValue.setMvccVersion(mvccVersion);
+      keyValue.setSequenceId(mvccVersion);
     }
     return keyValue;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index fe019d1..b1b384a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -232,6 +232,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
     }
 
     @Override
+    public long getSequenceId() {
+      return memstoreTS;
+    }
+
+    @Override
     public byte[] getValueArray() {
       return currentBuffer.array();
     }
@@ -422,6 +427,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
     }
 
     @Override
+    public long getSequenceId() {
+      return memstoreTS;
+    }
+
+    @Override
     public byte[] getValueArray() {
       return currentBuffer.array();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
index ce7356c..d71d1a4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
@@ -125,7 +125,7 @@ public class EncodedDataBlock {
             (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
         if (meta.isIncludesMvcc()) {
           long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
-          kv.setMvccVersion(mvccVersion);
+          kv.setSequenceId(mvccVersion);
         }
         return kv;
       }
@@ -244,7 +244,7 @@ public class EncodedDataBlock {
         }
         kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
             klength, vlength, tagsLength));
-        kv.setMvccVersion(memstoreTS);
+        kv.setSequenceId(memstoreTS);
         this.dataBlockEncoder.encode(kv, encodingCtx, out);
       }
       BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
index c27b91e..bca57d9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
@@ -70,7 +70,7 @@ public class TestCellCodec {
     Codec.Encoder encoder = codec.getEncoder(dos);
     final KeyValue kv =
       new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
-    kv.setMvccVersion(Long.MAX_VALUE);
+    kv.setSequenceId(Long.MAX_VALUE);
     encoder.write(kv);
     encoder.flush();
     dos.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
index e2af966..d60aba9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
@@ -50,7 +50,7 @@ public class TestByteRangeWithKVSerialization {
     long mvcc = pbr.getVLong();
     KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos,
         (int) KeyValue.getKeyValueDataStructureSize(keyLen, valLen, tagsLen));
-    kv.setMvccVersion(mvcc);
+    kv.setSequenceId(mvcc);
     return kv;
   }
 
@@ -65,7 +65,7 @@ public class TestByteRangeWithKVSerialization {
     Tag[] tags = new Tag[] { new Tag((byte) 1, "tag1") };
     for (int i = 0; i < kvCount; i++) {
       KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, VALUE, tags);
-      kv.setMvccVersion(i);
+      kv.setSequenceId(i);
       kvs.add(kv);
       totalSize += kv.getLength() + Bytes.SIZEOF_LONG;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
index 740a08e..7763c6a 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
@@ -123,6 +123,11 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
   }
 
   @Override
+  public long getSequenceId() {
+    return getMvccVersion();
+  }
+
+  @Override
   public int getValueLength() {
     return valueLength;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
index 8b729bc..2668f2a 100644
--- a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
+++ b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
@@ -44,21 +44,21 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
 	static List<KeyValue> d = Lists.newArrayList();
 	static{
 	  KeyValue kv0 = new KeyValue(Arow, cf, cq0, 0L, v0);
-	  kv0.setMvccVersion(123456789L);
+	  kv0.setSequenceId(123456789L);
 	  d.add(kv0);
 
 	  KeyValue kv1 = new KeyValue(Arow, cf, cq1, 1L, v0);
-    kv1.setMvccVersion(3L);
+    kv1.setSequenceId(3L);
     d.add(kv1);
 
 	  KeyValue kv2 = new KeyValue(Brow, cf, cq0, 12345678L, v0);
-    kv2.setMvccVersion(65537L);
+    kv2.setSequenceId(65537L);
     d.add(kv2);
 
 		//watch out... Long.MAX_VALUE comes back as 1332221664203, even with other encoders
 //		d.add(new KeyValue(Brow, cf, cq1, Long.MAX_VALUE, v0));
 	  KeyValue kv3 = new KeyValue(Brow, cf, cq1, Long.MAX_VALUE-1, v0);
-    kv3.setMvccVersion(1L);
+    kv3.setSequenceId(1L);
     d.add(kv3);
 
 	  KeyValue kv4 = new KeyValue(Brow, cf, cq1, 999999999, v0);
@@ -66,7 +66,7 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
 	  d.add(kv4);
 
 	  KeyValue kv5 = new KeyValue(Brow, cf, cq1, 12345, v0);
-    kv5.setMvccVersion(0L);
+    kv5.setSequenceId(0L);
     d.add(kv5);
 	}
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 19afcb2..efea2ba 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -897,6 +897,16 @@ public final class WALProtos {
      * <code>optional uint64 nonce = 10;</code>
      */
     long getNonce();
+
+    // optional uint64 orig_sequence_number = 11;
+    /**
+     * <code>optional uint64 orig_sequence_number = 11;</code>
+     */
+    boolean hasOrigSequenceNumber();
+    /**
+     * <code>optional uint64 orig_sequence_number = 11;</code>
+     */
+    long getOrigSequenceNumber();
   }
   /**
    * Protobuf type {@code WALKey}
@@ -1017,6 +1027,11 @@ public final class WALProtos {
               nonce_ = input.readUInt64();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000100;
+              origSequenceNumber_ = input.readUInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1323,6 +1338,22 @@ public final class WALProtos {
       return nonce_;
     }
 
+    // optional uint64 orig_sequence_number = 11;
+    public static final int ORIG_SEQUENCE_NUMBER_FIELD_NUMBER = 11;
+    private long origSequenceNumber_;
+    /**
+     * <code>optional uint64 orig_sequence_number = 11;</code>
+     */
+    public boolean hasOrigSequenceNumber() {
+      return ((bitField0_ & 0x00000100) == 0x00000100);
+    }
+    /**
+     * <code>optional uint64 orig_sequence_number = 11;</code>
+     */
+    public long getOrigSequenceNumber() {
+      return origSequenceNumber_;
+    }
+
     private void initFields() {
       encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
       tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -1334,6 +1365,7 @@ public final class WALProtos {
       clusterIds_ = java.util.Collections.emptyList();
       nonceGroup_ = 0L;
       nonce_ = 0L;
+      origSequenceNumber_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1411,6 +1443,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000080) == 0x00000080)) {
         output.writeUInt64(10, nonce_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        output.writeUInt64(11, origSequenceNumber_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1460,6 +1495,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(10, nonce_);
       }
+      if (((bitField0_ & 0x00000100) == 0x00000100)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(11, origSequenceNumber_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1527,6 +1566,11 @@ public final class WALProtos {
         result = result && (getNonce()
             == other.getNonce());
       }
+      result = result && (hasOrigSequenceNumber() == other.hasOrigSequenceNumber());
+      if (hasOrigSequenceNumber()) {
+        result = result && (getOrigSequenceNumber()
+            == other.getOrigSequenceNumber());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1580,6 +1624,10 @@ public final class WALProtos {
         hash = (37 * hash) + NONCE_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getNonce());
       }
+      if (hasOrigSequenceNumber()) {
+        hash = (37 * hash) + ORIG_SEQUENCE_NUMBER_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getOrigSequenceNumber());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1728,6 +1776,8 @@ public final class WALProtos {
         bitField0_ = (bitField0_ & ~0x00000100);
         nonce_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000200);
+        origSequenceNumber_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
 
@@ -1810,6 +1860,10 @@ public final class WALProtos {
           to_bitField0_ |= 0x00000080;
         }
         result.nonce_ = nonce_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000100;
+        }
+        result.origSequenceNumber_ = origSequenceNumber_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1902,6 +1956,9 @@ public final class WALProtos {
         if (other.hasNonce()) {
           setNonce(other.getNonce());
         }
+        if (other.hasOrigSequenceNumber()) {
+          setOrigSequenceNumber(other.getOrigSequenceNumber());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2977,6 +3034,39 @@ public final class WALProtos {
         return this;
       }
 
+      // optional uint64 orig_sequence_number = 11;
+      private long origSequenceNumber_ ;
+      /**
+       * <code>optional uint64 orig_sequence_number = 11;</code>
+       */
+      public boolean hasOrigSequenceNumber() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <code>optional uint64 orig_sequence_number = 11;</code>
+       */
+      public long getOrigSequenceNumber() {
+        return origSequenceNumber_;
+      }
+      /**
+       * <code>optional uint64 orig_sequence_number = 11;</code>
+       */
+      public Builder setOrigSequenceNumber(long value) {
+        bitField0_ |= 0x00000400;
+        origSequenceNumber_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint64 orig_sequence_number = 11;</code>
+       */
+      public Builder clearOrigSequenceNumber() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        origSequenceNumber_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:WALKey)
     }
 
@@ -5176,24 +5266,24 @@ public final class WALProtos {
     java.lang.String[] descriptorData = {
       "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
       "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
-      "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
+      "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" +
       "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
       "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
       " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
       " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
       "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
       "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
-      "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
-      "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
-      "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
-      " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
-      "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
-      "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
-      "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
-      "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
-      "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
-      "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
-      "\001\001"
+      "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
+      "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
+      "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
+      "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
+      "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
+      " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
+      "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
+      "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
+      "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
+      "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
+      "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5211,7 +5301,7 @@ public final class WALProtos {
           internal_static_WALKey_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_WALKey_descriptor,
-              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", });
+              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", "OrigSequenceNumber", });
           internal_static_FamilyScope_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_FamilyScope_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 8c7b84b..0ae65ec 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -54,6 +54,7 @@ message WALKey {
 
   optional uint64 nonceGroup = 9;
   optional uint64 nonce = 10;
+  optional uint64 orig_sequence_number = 11;
 
 /*
   optional CustomEntryType custom_entry_type = 9;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 0bfefaa..2f6ea39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -765,7 +765,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
       KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
           + blockBuffer.position(), getCellBufSize());
       if (this.reader.shouldIncludeMemstoreTS()) {
-        ret.setMvccVersion(currMemstoreTS);
+        ret.setSequenceId(currMemstoreTS);
       }
       return ret;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 2dc2388..348b6ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -106,6 +106,9 @@ public class ReplicationProtbufUtil {
         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
         keyBuilder.addClusterIds(uuidBuilder.build());
       }
+      if(key.getOrigLogSeqNum() > 0) {
+        keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
+      }
       WALEdit edit = entry.getEdit();
       NavigableMap<byte[], Integer> scopes = key.getScopes();
       if (scopes != null && !scopes.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index ad084a5..d90357b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -271,7 +271,7 @@ public class DefaultMemStore implements MemStore {
     assert alloc.getBytes() != null;
     alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
-    newKv.setMvccVersion(kv.getMvccVersion());
+    newKv.setSequenceId(kv.getMvccVersion());
     return newKv;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2429ed5..93eada8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2155,6 +2155,7 @@ public class HRegion implements HeapSize { // , Writable{
     /** This method is potentially expensive and should only be used for non-replay CP path. */
     public abstract Mutation[] getMutationsForCoprocs();
     public abstract boolean isInReplay();
+    public abstract long getReplaySequenceId();
 
     public boolean isDone() {
       return nextIndexToProcess == operations.length;
@@ -2194,11 +2195,18 @@ public class HRegion implements HeapSize { // , Writable{
     public boolean isInReplay() {
       return false;
     }
+    
+    @Override
+    public long getReplaySequenceId() {
+      return 0;
+    }
   }
 
   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
-    public ReplayBatch(MutationReplay[] operations) {
+    private long replaySeqId = 0;
+    public ReplayBatch(MutationReplay[] operations, long seqId) {
       super(operations);
+      this.replaySeqId = seqId;
     }
 
     @Override
@@ -2226,6 +2234,11 @@ public class HRegion implements HeapSize { // , Writable{
     public boolean isInReplay() {
       return true;
     }
+    
+    @Override
+    public long getReplaySequenceId() {
+      return this.replaySeqId;
+    }
   }
 
   /**
@@ -2252,13 +2265,14 @@ public class HRegion implements HeapSize { // , Writable{
   /**
    * Replay a batch of mutations.
    * @param mutations mutations to replay.
+   * @param replaySeqId SeqId for current mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
+  public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
       throws IOException {
-    return batchMutate(new ReplayBatch(mutations));
+    return batchMutate(new ReplayBatch(mutations, replaySeqId));
   }
 
   /**
@@ -2475,7 +2489,7 @@ public class HRegion implements HeapSize { // , Writable{
       // ------------------------------------
       // STEP 2. Update any LATEST_TIMESTAMP timestamps
       // ----------------------------------
-      for (int i = firstIndex; i < lastIndexExclusive; i++) {
+      for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
         // skip invalid
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) continue;
@@ -2485,16 +2499,18 @@ public class HRegion implements HeapSize { // , Writable{
           updateKVTimestamps(familyMaps[i].values(), byteNow);
           noOfPuts++;
         } else {
-          if (!isInReplay) {
-            prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
-          }
+          prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
           noOfDeletes++;
         }
       }
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
       locked = true;
-      mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+      if(isInReplay) {
+        mvccNum = batchOp.getReplaySequenceId();
+      } else {
+        mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+      }
       //
       // ------------------------------------
       // Acquire the latest mvcc number
@@ -2591,6 +2607,9 @@ public class HRegion implements HeapSize { // , Writable{
         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
             this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
             mutation.getClusterIds(), currentNonceGroup, currentNonce);
+        if(isInReplay) {
+          walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
+        }
         txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
           getSequenceId(), true, memstoreCells);
       }
@@ -2952,7 +2971,7 @@ public class HRegion implements HeapSize { // , Writable{
       Store store = getStore(family);
       for (Cell cell: cells) {
         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-        kv.setMvccVersion(mvccNum);
+        kv.setSequenceId(mvccNum);
         Pair<Long, Cell> ret = store.add(kv);
         size += ret.getFirst();
         memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
@@ -3213,6 +3232,7 @@ public class HRegion implements HeapSize { // , Writable{
     try {
       reader = HLogFactory.createReader(fs, edits, conf);
       long currentEditSeqId = -1;
+      long currentReplaySeqId = -1;
       long firstSeqIdInLog = -1;
       long skippedEdits = 0;
       long editsCount = 0;
@@ -3275,6 +3295,8 @@ public class HRegion implements HeapSize { // , Writable{
             firstSeqIdInLog = key.getLogSeqNum();
           }
           currentEditSeqId = key.getLogSeqNum();
+          currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? 
+            key.getOrigLogSeqNum() : currentEditSeqId;
           boolean flush = false;
           for (KeyValue kv: val.getKeyValues()) {
             // Check this edit is for me. Also, guard against writing the special
@@ -3309,6 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{
               skippedEdits++;
               continue;
             }
+            kv.setSequenceId(currentReplaySeqId);
             // Once we are over the limit, restoreEdit will keep returning true to
             // flush -- but don't flush until we've played all the kvs that make up
             // the WALEdit.
@@ -4922,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{
           writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           // 6. Apply to memstore
           for (KeyValue kv : mutations) {
-            kv.setMvccVersion(mvccNum);
+            kv.setSequenceId(mvccNum);
             Store store = getStore(kv);
             if (store == null) {
               checkFamily(CellUtil.cloneFamily(kv));
@@ -5168,7 +5191,7 @@ public class HRegion implements HeapSize { // , Writable{
                 // so only need to update the timestamp to 'now'
                 newKV.updateLatestStamp(Bytes.toBytes(now));
              }
-              newKV.setMvccVersion(mvccNum);
+              newKV.setSequenceId(mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5382,7 +5405,7 @@ public class HRegion implements HeapSize { // , Writable{
                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
               }
-              newKV.setMvccVersion(mvccNum);
+              newKV.setSequenceId(mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -6220,4 +6243,14 @@ public class HRegion implements HeapSize { // , Writable{
       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
     return key;
   }
+  
+  /**
+   * Explictly sync wal
+   * @throws IOException
+   */
+  public void syncWal() throws IOException {
+    if(this.log != null) {
+      this.log.sync();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index 2d247e9..0b5f5d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -246,7 +246,7 @@ public class MultiVersionConsistencyControl {
 
   public static class WriteEntry {
     private long writeNumber;
-    private boolean completed = false;
+    private volatile boolean completed = false;
 
     WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9a0e8a4..b84f9a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -633,12 +633,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
    * @param region
    * @param mutations
+   * @param replaySeqId
    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
    *         exceptionMessage if any
    * @throws IOException
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
-      final List<HLogSplitter.MutationReplay> mutations) throws IOException {
+      final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
     HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
 
     long before = EnvironmentEdgeManager.currentTimeMillis();
@@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (!region.getRegionInfo().isMetaTable()) {
         regionServer.cacheFlusher.reclaimMemStoreMemory();
       }
-      return region.batchReplay(mArray);
+      return region.batchReplay(mArray, replaySeqId);
     } finally {
       if (regionServer.metricsRegionServer != null) {
         long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -1330,7 +1331,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
       RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
-      List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
       // when tag is enabled, we need tag replay edits with log sequence number
       boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
       for (WALEntry entry : entries) {
@@ -1354,18 +1354,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
           walEntries.add(walEntry);
         }
-        mutations.addAll(edits);
-      }
-
-      if (!mutations.isEmpty()) {
-        OperationStatus[] result = doReplayBatchOp(region, mutations);
-        // check if it's a partial success
-        for (int i = 0; result != null && i < result.length; i++) {
-          if (result[i] != OperationStatus.SUCCESS) {
-            throw new IOException(result[i].getExceptionMsg());
+        if(edits!=null && !edits.isEmpty()) {
+          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 
+            entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
+          OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
+          // check if it's a partial success
+          for (int i = 0; result != null && i < result.length; i++) {
+            if (result[i] != OperationStatus.SUCCESS) {
+              throw new IOException(result[i].getExceptionMsg());
+            }
           }
         }
       }
+      
+      //sync wal at the end because ASYNC_WAL is used above
+      region.syncWal();
+
       if (coprocessorHost != null) {
         for (Pair<HLogKey, WALEdit> wal : walEntries) {
           coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index f0f92ae..1b07594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner {
       return false;
     }
 
-    // For the optimisation in HBASE-4346, we set the KV's memstoreTS to
-    // 0, if it is older than all the scanners' read points. It is possible
-    // that a newer KV's memstoreTS was reset to 0. But, there is an
-    // older KV which was not reset to 0 (because it was
-    // not old enough during flush). Make sure that we set it correctly now,
-    // so that the comparision order does not change.
-    if (cur.getMvccVersion() <= readPt) {
-      KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0);
-    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 9e792c4..a1d629a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -58,6 +58,9 @@ public abstract class Compactor {
 
   private int compactionKVMax;
   protected Compression.Algorithm compactionCompression;
+  
+  /** specify how many days to keep MVCC values during major compaction **/ 
+  protected int keepSeqIdPeriod;
 
   //TODO: depending on Store is not good but, realistically, all compactors currently do.
   Compactor(final Configuration conf, final Store store) {
@@ -67,6 +70,8 @@ public abstract class Compactor {
       this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
     this.compactionCompression = (this.store.getFamily() == null) ?
         Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
+    this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
+      HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
   }
 
   /**
@@ -92,19 +97,30 @@ public abstract class Compactor {
     public long maxMVCCReadpoint = 0;
     /** Max tags length**/
     public int maxTagsLength = 0;
+    /** Min SeqId to keep during a major compaction **/
+    public long minSeqIdToKeep = 0;
   }
 
   /**
    * Extracts some details about the files to compact that are commonly needed by compactors.
    * @param filesToCompact Files.
-   * @param calculatePutTs Whether earliest put TS is needed.
+   * @param allFiles Whether all files are included for compaction
    * @return The result.
    */
   protected FileDetails getFileDetails(
-      Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
+      Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
     FileDetails fd = new FileDetails();
+    long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
+      (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
 
     for (StoreFile file : filesToCompact) {
+      if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
+        // when isAllFiles is true, all files are compacted so we can calculate the smallest 
+        // MVCC value to keep
+        if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
+          fd.minSeqIdToKeep = file.getMaxMemstoreTS();
+        }
+      }
       long seqNum = file.getMaxSequenceId();
       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
       StoreFile.Reader r = file.getReader();
@@ -130,7 +146,7 @@ public abstract class Compactor {
       // If required, calculate the earliest put timestamp of all involved storefiles.
       // This is used to remove family delete marker during compaction.
       long earliestPutTs = 0;
-      if (calculatePutTs) {
+      if (allFiles) {
         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
         if (tmp == null) {
           // There's a file with no information, must be an old one
@@ -148,7 +164,7 @@ public abstract class Compactor {
           ", size=" + StringUtils.humanReadableInt(r.length()) +
           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
           ", seqNum=" + seqNum +
-          (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
+          (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
       }
     }
     return fd;
@@ -202,10 +218,11 @@ public abstract class Compactor {
    * @param scanner Where to read from.
    * @param writer Where to write to.
    * @param smallestReadPoint Smallest read point.
+   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
    * @return Whether compaction ended; false if it was interrupted for some reason.
    */
   protected boolean performCompaction(InternalScanner scanner,
-      CellSink writer, long smallestReadPoint) throws IOException {
+      CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
     int bytesWritten = 0;
     // Since scanner.next() can return 'false' but still be delivering data,
     // we have to use a do/while loop.
@@ -218,8 +235,8 @@ public abstract class Compactor {
       // output to writer:
       for (Cell c : kvs) {
         KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-        if (kv.getMvccVersion() <= smallestReadPoint) {
-          kv.setMvccVersion(0);
+        if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
+          kv.setSequenceId(0);
         }
         writer.append(kv);
         ++progress.currentCompactedKVs;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 3e8523d..d5b2b63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor {
 
     StoreFile.Writer writer = null;
     List<Path> newFiles = new ArrayList<Path>();
+    boolean cleanSeqId = false;
     try {
       InternalScanner scanner = null;
       try {
@@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor {
         }
         // Create the writer even if no kv(Empty store file is also ok),
         // because we need record the max seq id for the store file, see HBASE-6059
+        if(fd.minSeqIdToKeep > 0) {
+          smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+          cleanSeqId = true;
+        }
         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
             fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
-        boolean finished = performCompaction(scanner, writer, smallestReadPoint);
+        boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
         if (!finished) {
           writer.close();
           store.getFileSystem().delete(writer.getPath(), false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 11556e5..487ff46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor {
 
     boolean finished = false;
     InternalScanner scanner = null;
+    boolean cleanSeqId = false;
     try {
       // Get scanner to use.
       ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
@@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor {
       }
 
       // Create the writer factory for compactions.
+      if(fd.minSeqIdToKeep > 0) {
+        smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+        cleanSeqId = true;
+      }
       final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
       final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
       StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor {
       // It is ok here if storeScanner is null.
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
       mw.init(storeScanner, factory, store.getComparator());
-      finished = performCompaction(scanner, mw, smallestReadPoint);
+      finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
       if (!finished) {
         throw new InterruptedIOException( "Aborting compaction of store " + store +
             " in region " + store.getRegionInfo().getRegionNameAsString() +

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 1e9472a..a9c2055 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -91,9 +91,9 @@ class FSWALEntry extends HLog.Entry {
    */
   long stampRegionSequenceId() {
     long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
+    if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) {
       for(KeyValue kv : this.memstoreKVs){
-        kv.setMvccVersion(regionSequenceId);
+        kv.setSequenceId(regionSequenceId);
       }
     }
     HLogKey key = getKey();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index ad1c001..5019ff5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -119,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
   private byte [] encodedRegionName;
   private TableName tablename;
   private long logSeqNum;
+  private long origLogSeqNum = 0;
   private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
   // Time at which this edit was written.
   private long writeTime;
@@ -256,6 +257,22 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
   }
   
   /**
+   * Used to set original seq Id for HLogKey during wal replay
+   * @param seqId
+   */
+  public void setOrigLogSeqNum(final long seqId) {
+    this.origLogSeqNum = seqId;
+  }
+  
+  /**
+   * Return a positive long if current HLogKey is created from a replay edit
+   * @return original sequence number of the WALEdit
+   */
+  public long getOrigLogSeqNum() {
+    return this.origLogSeqNum;
+  }
+  
+  /**
    * Wait for sequence number is assigned & return the assigned value
    * @return long the new assigned sequence number
    * @throws InterruptedException
@@ -536,6 +553,9 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
     }
     builder.setLogSequenceNumber(this.logSeqNum);
     builder.setWriteTime(writeTime);
+    if(this.origLogSeqNum > 0) {
+      builder.setOrigSequenceNumber(this.origLogSeqNum);
+    }
     if (this.nonce != HConstants.NO_NONCE) {
       builder.setNonce(nonce);
     }
@@ -599,5 +619,8 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
     }
     this.logSeqNum = walKey.getLogSequenceNumber();
     this.writeTime = walKey.getWriteTime();
+    if(walKey.hasOrigSequenceNumber()) {
+      this.origLogSeqNum = walKey.getOrigSequenceNumber();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 0ce4a64..873e863 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -1863,6 +1864,10 @@ public class HLogSplitter {
     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
       this.type = type;
       this.mutation = mutation;
+      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
+        // using ASYNC_WAL for relay
+        this.mutation.setDurability(Durability.ASYNC_WAL);
+      }
       this.nonceGroup = nonceGroup;
       this.nonce = nonce;
     }
@@ -1875,10 +1880,10 @@ public class HLogSplitter {
 
  /**
   * Tag original sequence number for each edit to be replayed
-  * @param entry
+  * @param seqId
   * @param cell
   */
-  private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
+  private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
     // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
     boolean needAddRecoveryTag = true;
     if (cell.getTagsLength() > 0) {
@@ -1891,8 +1896,7 @@ public class HLogSplitter {
     }
     if (needAddRecoveryTag) {
       List<Tag> newTags = new ArrayList<Tag>();
-      Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
-          .getLogSequenceNumber()));
+      Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
       newTags.add(replayTag);
       return KeyValue.cloneAndAddTags(cell, newTags);
     }
@@ -1918,6 +1922,8 @@ public class HLogSplitter {
       return new ArrayList<MutationReplay>();
     }
 
+    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 
+      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
     int count = entry.getAssociatedCellCount();
     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
     Cell previousCell = null;
@@ -1958,7 +1964,7 @@ public class HLogSplitter {
       } else {
         Cell tmpNewCell = cell;
         if (addLogReplayTag) {
-          tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
+          tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
         }
         ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
       }
@@ -1973,8 +1979,8 @@ public class HLogSplitter {
         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
       }
       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
-              .getTableName().toByteArray()), walKey.getLogSequenceNumber(), 
-              walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
+              .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, 
+              walKey.getNonceGroup(), walKey.getNonce());
       logEntry.setFirst(key);
       logEntry.setSecond(val);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 77734a7..8f73431 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -1906,7 +1906,7 @@ public class AccessController extends BaseRegionObserver
       newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
       tags);
     // Preserve mvcc data
-    rewriteKv.setMvccVersion(newKv.getMvccVersion());
+    rewriteKv.setSequenceId(newKv.getMvccVersion());
     return rewriteKv;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 9cd21d8..39f65db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -1255,7 +1255,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
     	newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
     	newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags);
     // Preserve mvcc data
-    rewriteKv.setMvccVersion(newKv.getMvccVersion());
+    rewriteKv.setSequenceId(newKv.getMvccVersion());
     return rewriteKv;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 86c30d1..fde40ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3055,7 +3055,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     }
   }
 
-  private static <T> String safeGetAsStr(List<T> lst, int i) {
+  public static <T> String safeGetAsStr(List<T> lst, int i) {
     if (0 <= i && i < lst.size()) {
       return lst.get(i).toString();
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 37456a8..09561cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -176,7 +176,7 @@ public class TestHFileBlock {
       totalSize += kv.getLength();
       if (includesMemstoreTS) {
         long memstoreTS = randomizer.nextLong();
-        kv.setMvccVersion(memstoreTS);
+        kv.setSequenceId(memstoreTS);
         totalSize += WritableUtils.getVIntSize(memstoreTS);
       }
       hbw.write(kv);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index ebe95b1..3743fdd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -241,7 +241,7 @@ public class TestDefaultMemStore extends TestCase {
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv1 = new KeyValue(row, f, q1, v);
-    kv1.setMvccVersion(w.getWriteNumber());
+    kv1.setSequenceId(w.getWriteNumber());
     memstore.add(kv1);
 
     KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@@ -254,7 +254,7 @@ public class TestDefaultMemStore extends TestCase {
 
     w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kv2 = new KeyValue(row, f, q2, v);
-    kv2.setMvccVersion(w.getWriteNumber());
+    kv2.setSequenceId(w.getWriteNumber());
     memstore.add(kv2);
 
     s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@@ -285,11 +285,11 @@ public class TestDefaultMemStore extends TestCase {
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
-    kv11.setMvccVersion(w.getWriteNumber());
+    kv11.setSequenceId(w.getWriteNumber());
     memstore.add(kv11);
 
     KeyValue kv12 = new KeyValue(row, f, q2, v1);
-    kv12.setMvccVersion(w.getWriteNumber());
+    kv12.setSequenceId(w.getWriteNumber());
     memstore.add(kv12);
     mvcc.completeMemstoreInsert(w);
 
@@ -300,11 +300,11 @@ public class TestDefaultMemStore extends TestCase {
     // START INSERT 2: Write both columns val2
     w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kv21 = new KeyValue(row, f, q1, v2);
-    kv21.setMvccVersion(w.getWriteNumber());
+    kv21.setSequenceId(w.getWriteNumber());
     memstore.add(kv21);
 
     KeyValue kv22 = new KeyValue(row, f, q2, v2);
-    kv22.setMvccVersion(w.getWriteNumber());
+    kv22.setSequenceId(w.getWriteNumber());
     memstore.add(kv22);
 
     // BEFORE COMPLETING INSERT 2, SEE FIRST KVS
@@ -337,11 +337,11 @@ public class TestDefaultMemStore extends TestCase {
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
-    kv11.setMvccVersion(w.getWriteNumber());
+    kv11.setSequenceId(w.getWriteNumber());
     memstore.add(kv11);
 
     KeyValue kv12 = new KeyValue(row, f, q2, v1);
-    kv12.setMvccVersion(w.getWriteNumber());
+    kv12.setSequenceId(w.getWriteNumber());
     memstore.add(kv12);
     mvcc.completeMemstoreInsert(w);
 
@@ -353,7 +353,7 @@ public class TestDefaultMemStore extends TestCase {
     w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
         KeyValue.Type.DeleteColumn);
-    kvDel.setMvccVersion(w.getWriteNumber());
+    kvDel.setSequenceId(w.getWriteNumber());
     memstore.add(kvDel);
 
     // BEFORE COMPLETING DELETE, SEE FIRST KVS
@@ -414,7 +414,7 @@ public class TestDefaultMemStore extends TestCase {
         byte[] v = Bytes.toBytes(i);
 
         KeyValue kv = new KeyValue(row, f, q1, i, v);
-        kv.setMvccVersion(w.getWriteNumber());
+        kv.setSequenceId(w.getWriteNumber());
         memstore.add(kv);
         mvcc.completeMemstoreInsert(w);
 
@@ -827,7 +827,7 @@ public class TestDefaultMemStore extends TestCase {
     KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
     KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
 
-    kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1);
+    kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
     l.add(kv1); l.add(kv2); l.add(kv3);
 
     this.memstore.upsert(l, 2);// readpoint is 2
@@ -835,7 +835,7 @@ public class TestDefaultMemStore extends TestCase {
     assert(newSize > oldSize);
 
     KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
-    kv4.setMvccVersion(1);
+    kv4.setSequenceId(1);
     l.clear(); l.add(kv4);
     this.memstore.upsert(l, 3);
     assertEquals(newSize, this.memstore.size.get());
@@ -877,7 +877,7 @@ public class TestDefaultMemStore extends TestCase {
       // test the case that the timeOfOldestEdit is updated after a KV upsert
       List<Cell> l = new ArrayList<Cell>();
       KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
-      kv1.setMvccVersion(100);
+      kv1.setSequenceId(100);
       l.add(kv1);
       memstore.upsert(l, 1000);
       t = memstore.timeOfOldestEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index fd944f9..8a588e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -528,6 +528,7 @@ public class TestHRegion {
       }
       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
       assertEquals(maxSeqId, seqId);
+      region.getMVCC().initialize(seqId);
       Get get = new Get(row);
       Result result = region.get(get);
       for (long i = minSeqId; i <= maxSeqId; i += 10) {
@@ -579,6 +580,7 @@ public class TestHRegion {
       }
       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
       assertEquals(maxSeqId, seqId);
+      region.getMVCC().initialize(seqId);
       Get get = new Get(row);
       Result result = region.get(get);
       for (long i = minSeqId; i <= maxSeqId; i += 10) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 2cd5e3a..c71f4f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -689,7 +689,7 @@ public class TestReversibleScanners {
   private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
     KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
         VALUES[rowNum % VALUESIZE]);
-    kv.setMvccVersion(makeMVCC(rowNum, cqNum));
+    kv.setSequenceId(makeMVCC(rowNum, cqNum));
     return kv;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
index 988d82f..450dd82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -449,5 +448,32 @@ public class TestSeekOptimizations {
   }
 
 
+  public void assertKVListsEqual(String additionalMsg,
+      final List<? extends Cell> expected,
+      final List<? extends Cell> actual) {
+    final int eLen = expected.size();
+    final int aLen = actual.size();
+    final int minLen = Math.min(eLen, aLen);
+
+    int i;
+    for (i = 0; i < minLen
+        && KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0;
+        ++i) {}
+
+    if (additionalMsg == null) {
+      additionalMsg = "";
+    }
+    if (!additionalMsg.isEmpty()) {
+      additionalMsg = ". " + additionalMsg;
+    }
+
+    if (eLen != aLen || i != minLen) {
+      throw new AssertionError(
+          "Expected and actual KV arrays differ at position " + i + ": " +
+          HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
+          HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
+    }
+  }
+
 }
 


Mime
View raw message