Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D50C811A7C for ; Mon, 7 Jul 2014 01:50:21 +0000 (UTC) Received: (qmail 83169 invoked by uid 500); 7 Jul 2014 01:50:21 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 83117 invoked by uid 500); 7 Jul 2014 01:50:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 83107 invoked by uid 99); 7 Jul 2014 01:50:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2014 01:50:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 39DA49A0883; Mon, 7 Jul 2014 01:50:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeffreyz@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: HBase-11315: Keeping MVCC for configurable longer time Date: Mon, 7 Jul 2014 01:50:21 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1 738bf63f1 -> cc4f54770 HBase-11315: Keeping MVCC for configurable longer time Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cc4f5477 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cc4f5477 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cc4f5477 Branch: refs/heads/branch-1 Commit: cc4f54770bc9b517f561a6fefcca087ba15c79c2 Parents: 738bf63 Author: Jeffrey Zhong Authored: Sun Jul 6 18:25:18 2014 -0700 Committer: Jeffrey Zhong Committed: Sun Jul 6 18:51:48 2014 -0700 ---------------------------------------------------------------------- .../ipc/TestPayloadCarryingRpcController.java | 6 + .../main/java/org/apache/hadoop/hbase/Cell.java | 11 ++ .../java/org/apache/hadoop/hbase/CellUtil.java | 4 +- .../org/apache/hadoop/hbase/HConstants.java | 5 + .../java/org/apache/hadoop/hbase/KeyValue.java | 24 ++-- .../org/apache/hadoop/hbase/KeyValueUtil.java | 4 +- .../io/encoding/BufferedDataBlockEncoder.java | 10 ++ .../hbase/io/encoding/EncodedDataBlock.java | 4 +- .../hadoop/hbase/codec/TestCellCodec.java | 2 +- .../util/TestByteRangeWithKVSerialization.java | 4 +- .../codec/prefixtree/decode/PrefixTreeCell.java | 5 + .../data/TestRowDataDifferentTimestamps.java | 10 +- .../hbase/protobuf/generated/WALProtos.java | 116 ++++++++++++++++--- hbase-protocol/src/main/protobuf/WAL.proto | 1 + .../hadoop/hbase/io/hfile/HFileReaderV2.java | 2 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 3 + .../hbase/regionserver/DefaultMemStore.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 57 +++++++-- .../MultiVersionConsistencyControl.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 28 +++-- .../hbase/regionserver/StoreFileScanner.java | 9 -- .../regionserver/compactions/Compactor.java | 31 +++-- .../compactions/DefaultCompactor.java | 7 +- .../compactions/StripeCompactor.java | 7 +- .../hbase/regionserver/wal/FSWALEntry.java | 4 +- .../hadoop/hbase/regionserver/wal/HLogKey.java | 23 ++++ .../hbase/regionserver/wal/HLogSplitter.java | 20 ++-- .../hbase/security/access/AccessController.java | 2 +- .../visibility/VisibilityController.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 2 +- .../hbase/regionserver/TestDefaultMemStore.java | 26 ++--- .../hadoop/hbase/regionserver/TestHRegion.java | 2 + .../regionserver/TestReversibleScanners.java | 2 +- .../regionserver/TestSeekOptimizations.java | 28 ++++- 35 files changed, 359 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java index 249cc42..088e609 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java @@ -144,6 +144,12 @@ public class TestPayloadCarryingRpcController { } @Override + public long getSequenceId() { + // unused + return 0; + } + + @Override public byte[] getValueArray() { return Bytes.toBytes(this.i); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java index 27b9345..f7e9272 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java @@ -142,14 +142,25 @@ public interface Cell { //6) MvccVersion /** + * @deprecated as of 1.0, use {@link Cell#getSequenceId()} + * * Internal use only. A region-specific sequence ID given to each operation. It always exists for * cells in the memstore but is not retained forever. It may survive several flushes, but * generally becomes irrelevant after the cell's row is no longer involved in any operations that * require strict consistency. * @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists */ + @Deprecated long getMvccVersion(); + /** + * A region-specific unique monotonically increasing sequence ID given to each Cell. It always + * exists for cells in the memstore but is not retained forever. It will be kept for + * {@link HConstants#KEEP_SEQID_PERIOD} days, but generally becomes irrelevant after the cell's + * row is no longer involved in any operations that require strict consistency. + * @return seqId (always > 0 if exists), or 0 if it no longer exists + */ + long getSequenceId(); //7) Value http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 376e073..d6564c2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -167,7 +167,7 @@ public final class CellUtil { final long timestamp, final byte type, final byte[] value, final long memstoreTS) { KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, KeyValue.Type.codeToType(type), value); - keyValue.setMvccVersion(memstoreTS); + keyValue.setSequenceId(memstoreTS); return keyValue; } @@ -175,7 +175,7 @@ public final class CellUtil { final long timestamp, final byte type, final byte[] value, byte[] tags, final long memstoreTS) { KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, KeyValue.Type.codeToType(type), value, tags); - keyValue.setMvccVersion(memstoreTS); + keyValue.setSequenceId(memstoreTS); return keyValue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c2709f5..93209fd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -353,6 +353,11 @@ public final class HConstants { /** Default value for cluster ID */ public static final String CLUSTER_ID_DEFAULT = "default-cluster"; + + /** Parameter name for # days to keep MVCC values during a major compaction */ + public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period"; + /** At least to keep MVCC values in hfiles for 5 days */ + public static final int MIN_KEEP_SEQID_PERIOD = 5; // Always store the location of the root table's HRegion. // This HRegion is never split. http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 887946e..002642a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -284,15 +284,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // used to achieve atomic operations in the memstore. @Override public long getMvccVersion() { - return mvcc; + return this.getSequenceId(); } - public void setMvccVersion(long mvccVersion){ - this.mvcc = mvccVersion; + /** + * used to achieve atomic operations in the memstore. + */ + @Override + public long getSequenceId() { + return seqId; + } + + public void setSequenceId(long seqId) { + this.seqId = seqId; } // multi-version concurrency control version. default value is 0, aka do not care. - private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles) + private long seqId = 0; /** Dragon time over, return to normal business */ @@ -1083,7 +1091,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // Important to clone the memstoreTS as well - otherwise memstore's // update-in-place methods (eg increment) will end up creating // new entries - ret.setMvccVersion(mvcc); + ret.setSequenceId(seqId); return ret; } @@ -1094,7 +1102,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { */ public KeyValue shallowCopy() { KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length); - shallowCopy.setMvccVersion(this.mvcc); + shallowCopy.setSequenceId(this.seqId); return shallowCopy; } @@ -1108,8 +1116,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable { if (this.bytes == null || this.bytes.length == 0) { return "empty"; } - return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + - "/vlen=" + getValueLength() + "/mvcc=" + mvcc; + return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen=" + + getValueLength() + "/seqid=" + seqId; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 4c1f345..c2a8826 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -75,7 +75,7 @@ public class KeyValueUtil { public static KeyValue copyToNewKeyValue(final Cell cell) { byte[] bytes = copyToNewByteArray(cell); KeyValue kvCell = new KeyValue(bytes, 0, bytes.length); - kvCell.setMvccVersion(cell.getMvccVersion()); + kvCell.setSequenceId(cell.getMvccVersion()); return kvCell; } @@ -175,7 +175,7 @@ public class KeyValueUtil { keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength); if (includesMvccVersion) { long mvccVersion = ByteBufferUtils.readVLong(bb); - keyValue.setMvccVersion(mvccVersion); + keyValue.setSequenceId(mvccVersion); } return keyValue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index fe019d1..b1b384a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -232,6 +232,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } @Override + public long getSequenceId() { + return memstoreTS; + } + + @Override public byte[] getValueArray() { return currentBuffer.array(); } @@ -422,6 +427,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } @Override + public long getSequenceId() { + return memstoreTS; + } + + @Override public byte[] getValueArray() { return currentBuffer.array(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java index ce7356c..d71d1a4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java @@ -125,7 +125,7 @@ public class EncodedDataBlock { (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen)); if (meta.isIncludesMvcc()) { long mvccVersion = ByteBufferUtils.readVLong(decompressedData); - kv.setMvccVersion(mvccVersion); + kv.setSequenceId(mvccVersion); } return kv; } @@ -244,7 +244,7 @@ public class EncodedDataBlock { } kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize( klength, vlength, tagsLength)); - kv.setMvccVersion(memstoreTS); + kv.setSequenceId(memstoreTS); this.dataBlockEncoder.encode(kv, encodingCtx, out); } BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java index c27b91e..bca57d9 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java @@ -70,7 +70,7 @@ public class TestCellCodec { Codec.Encoder encoder = codec.getEncoder(dos); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v")); - kv.setMvccVersion(Long.MAX_VALUE); + kv.setSequenceId(Long.MAX_VALUE); encoder.write(kv); encoder.flush(); dos.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java index e2af966..d60aba9 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java @@ -50,7 +50,7 @@ public class TestByteRangeWithKVSerialization { long mvcc = pbr.getVLong(); KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos, (int) KeyValue.getKeyValueDataStructureSize(keyLen, valLen, tagsLen)); - kv.setMvccVersion(mvcc); + kv.setSequenceId(mvcc); return kv; } @@ -65,7 +65,7 @@ public class TestByteRangeWithKVSerialization { Tag[] tags = new Tag[] { new Tag((byte) 1, "tag1") }; for (int i = 0; i < kvCount; i++) { KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, VALUE, tags); - kv.setMvccVersion(i); + kv.setSequenceId(i); kvs.add(kv); totalSize += kv.getLength() + Bytes.SIZEOF_LONG; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java ---------------------------------------------------------------------- diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java index 740a08e..7763c6a 100644 --- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java +++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java @@ -123,6 +123,11 @@ public class PrefixTreeCell implements Cell, Comparable { } @Override + public long getSequenceId() { + return getMvccVersion(); + } + + @Override public int getValueLength() { return valueLength; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java ---------------------------------------------------------------------- diff --git a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java index 8b729bc..2668f2a 100644 --- a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java +++ b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java @@ -44,21 +44,21 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{ static List d = Lists.newArrayList(); static{ KeyValue kv0 = new KeyValue(Arow, cf, cq0, 0L, v0); - kv0.setMvccVersion(123456789L); + kv0.setSequenceId(123456789L); d.add(kv0); KeyValue kv1 = new KeyValue(Arow, cf, cq1, 1L, v0); - kv1.setMvccVersion(3L); + kv1.setSequenceId(3L); d.add(kv1); KeyValue kv2 = new KeyValue(Brow, cf, cq0, 12345678L, v0); - kv2.setMvccVersion(65537L); + kv2.setSequenceId(65537L); d.add(kv2); //watch out... Long.MAX_VALUE comes back as 1332221664203, even with other encoders // d.add(new KeyValue(Brow, cf, cq1, Long.MAX_VALUE, v0)); KeyValue kv3 = new KeyValue(Brow, cf, cq1, Long.MAX_VALUE-1, v0); - kv3.setMvccVersion(1L); + kv3.setSequenceId(1L); d.add(kv3); KeyValue kv4 = new KeyValue(Brow, cf, cq1, 999999999, v0); @@ -66,7 +66,7 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{ d.add(kv4); KeyValue kv5 = new KeyValue(Brow, cf, cq1, 12345, v0); - kv5.setMvccVersion(0L); + kv5.setSequenceId(0L); d.add(kv5); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 19afcb2..efea2ba 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -897,6 +897,16 @@ public final class WALProtos { * optional uint64 nonce = 10; */ long getNonce(); + + // optional uint64 orig_sequence_number = 11; + /** + * optional uint64 orig_sequence_number = 11; + */ + boolean hasOrigSequenceNumber(); + /** + * optional uint64 orig_sequence_number = 11; + */ + long getOrigSequenceNumber(); } /** * Protobuf type {@code WALKey} @@ -1017,6 +1027,11 @@ public final class WALProtos { nonce_ = input.readUInt64(); break; } + case 88: { + bitField0_ |= 0x00000100; + origSequenceNumber_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -1323,6 +1338,22 @@ public final class WALProtos { return nonce_; } + // optional uint64 orig_sequence_number = 11; + public static final int ORIG_SEQUENCE_NUMBER_FIELD_NUMBER = 11; + private long origSequenceNumber_; + /** + * optional uint64 orig_sequence_number = 11; + */ + public boolean hasOrigSequenceNumber() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional uint64 orig_sequence_number = 11; + */ + public long getOrigSequenceNumber() { + return origSequenceNumber_; + } + private void initFields() { encodedRegionName_ = com.google.protobuf.ByteString.EMPTY; tableName_ = com.google.protobuf.ByteString.EMPTY; @@ -1334,6 +1365,7 @@ public final class WALProtos { clusterIds_ = java.util.Collections.emptyList(); nonceGroup_ = 0L; nonce_ = 0L; + origSequenceNumber_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1411,6 +1443,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000080) == 0x00000080)) { output.writeUInt64(10, nonce_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeUInt64(11, origSequenceNumber_); + } getUnknownFields().writeTo(output); } @@ -1460,6 +1495,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(10, nonce_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(11, origSequenceNumber_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1527,6 +1566,11 @@ public final class WALProtos { result = result && (getNonce() == other.getNonce()); } + result = result && (hasOrigSequenceNumber() == other.hasOrigSequenceNumber()); + if (hasOrigSequenceNumber()) { + result = result && (getOrigSequenceNumber() + == other.getOrigSequenceNumber()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1580,6 +1624,10 @@ public final class WALProtos { hash = (37 * hash) + NONCE_FIELD_NUMBER; hash = (53 * hash) + hashLong(getNonce()); } + if (hasOrigSequenceNumber()) { + hash = (37 * hash) + ORIG_SEQUENCE_NUMBER_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getOrigSequenceNumber()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1728,6 +1776,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000100); nonce_ = 0L; bitField0_ = (bitField0_ & ~0x00000200); + origSequenceNumber_ = 0L; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -1810,6 +1860,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000080; } result.nonce_ = nonce_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000100; + } + result.origSequenceNumber_ = origSequenceNumber_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1902,6 +1956,9 @@ public final class WALProtos { if (other.hasNonce()) { setNonce(other.getNonce()); } + if (other.hasOrigSequenceNumber()) { + setOrigSequenceNumber(other.getOrigSequenceNumber()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2977,6 +3034,39 @@ public final class WALProtos { return this; } + // optional uint64 orig_sequence_number = 11; + private long origSequenceNumber_ ; + /** + * optional uint64 orig_sequence_number = 11; + */ + public boolean hasOrigSequenceNumber() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint64 orig_sequence_number = 11; + */ + public long getOrigSequenceNumber() { + return origSequenceNumber_; + } + /** + * optional uint64 orig_sequence_number = 11; + */ + public Builder setOrigSequenceNumber(long value) { + bitField0_ |= 0x00000400; + origSequenceNumber_ = value; + onChanged(); + return this; + } + /** + * optional uint64 orig_sequence_number = 11; + */ + public Builder clearOrigSequenceNumber() { + bitField0_ = (bitField0_ & ~0x00000400); + origSequenceNumber_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALKey) } @@ -5176,24 +5266,24 @@ public final class WALProtos { java.lang.String[] descriptorData = { "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" + "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" + - "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" + + "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" + "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" + "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" + " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" + " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" + "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" + "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + - "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil", - "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + - "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + - " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" + - "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + - "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + - "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" + - "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" + - "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" + - "\001\001" + "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number", + "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" + + "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" + + "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" + + "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" + + " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" + + "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" + + "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" + + "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" + + "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" + + "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5211,7 +5301,7 @@ public final class WALProtos { internal_static_WALKey_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALKey_descriptor, - new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", }); + new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", "OrigSequenceNumber", }); internal_static_FamilyScope_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_FamilyScope_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-protocol/src/main/protobuf/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 8c7b84b..0ae65ec 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -54,6 +54,7 @@ message WALKey { optional uint64 nonceGroup = 9; optional uint64 nonce = 10; + optional uint64 orig_sequence_number = 11; /* optional CustomEntryType custom_entry_type = 9; http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 0bfefaa..2f6ea39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -765,7 +765,7 @@ public class HFileReaderV2 extends AbstractHFileReader { KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position(), getCellBufSize()); if (this.reader.shouldIncludeMemstoreTS()) { - ret.setMvccVersion(currMemstoreTS); + ret.setSequenceId(currMemstoreTS); } return ret; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 2dc2388..348b6ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -106,6 +106,9 @@ public class ReplicationProtbufUtil { uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); keyBuilder.addClusterIds(uuidBuilder.build()); } + if(key.getOrigLogSeqNum() > 0) { + keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum()); + } WALEdit edit = entry.getEdit(); NavigableMap scopes = key.getScopes(); if (scopes != null && !scopes.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index ad084a5..d90357b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -271,7 +271,7 @@ public class DefaultMemStore implements MemStore { assert alloc.getBytes() != null; alloc.put(0, kv.getBuffer(), kv.getOffset(), len); KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setMvccVersion(kv.getMvccVersion()); + newKv.setSequenceId(kv.getMvccVersion()); return newKv; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2429ed5..93eada8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2155,6 +2155,7 @@ public class HRegion implements HeapSize { // , Writable{ /** This method is potentially expensive and should only be used for non-replay CP path. */ public abstract Mutation[] getMutationsForCoprocs(); public abstract boolean isInReplay(); + public abstract long getReplaySequenceId(); public boolean isDone() { return nextIndexToProcess == operations.length; @@ -2194,11 +2195,18 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return false; } + + @Override + public long getReplaySequenceId() { + return 0; + } } private static class ReplayBatch extends BatchOperationInProgress { - public ReplayBatch(MutationReplay[] operations) { + private long replaySeqId = 0; + public ReplayBatch(MutationReplay[] operations, long seqId) { super(operations); + this.replaySeqId = seqId; } @Override @@ -2226,6 +2234,11 @@ public class HRegion implements HeapSize { // , Writable{ public boolean isInReplay() { return true; } + + @Override + public long getReplaySequenceId() { + return this.replaySeqId; + } } /** @@ -2252,13 +2265,14 @@ public class HRegion implements HeapSize { // , Writable{ /** * Replay a batch of mutations. * @param mutations mutations to replay. + * @param replaySeqId SeqId for current mutations * @return an array of OperationStatus which internally contains the * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations) + public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId) throws IOException { - return batchMutate(new ReplayBatch(mutations)); + return batchMutate(new ReplayBatch(mutations, replaySeqId)); } /** @@ -2475,7 +2489,7 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------------------ // STEP 2. Update any LATEST_TIMESTAMP timestamps // ---------------------------------- - for (int i = firstIndex; i < lastIndexExclusive; i++) { + for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) continue; @@ -2485,16 +2499,18 @@ public class HRegion implements HeapSize { // , Writable{ updateKVTimestamps(familyMaps[i].values(), byteNow); noOfPuts++; } else { - if (!isInReplay) { - prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); - } + prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); noOfDeletes++; } } lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + if(isInReplay) { + mvccNum = batchOp.getReplaySequenceId(); + } else { + mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + } // // ------------------------------------ // Acquire the latest mvcc number @@ -2591,6 +2607,9 @@ public class HRegion implements HeapSize { // , Writable{ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce); + if(isInReplay) { + walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); + } txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, getSequenceId(), true, memstoreCells); } @@ -2952,7 +2971,7 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (Cell cell: cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMvccVersion(mvccNum); + kv.setSequenceId(mvccNum); Pair ret = store.add(kv); size += ret.getFirst(); memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); @@ -3213,6 +3232,7 @@ public class HRegion implements HeapSize { // , Writable{ try { reader = HLogFactory.createReader(fs, edits, conf); long currentEditSeqId = -1; + long currentReplaySeqId = -1; long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; @@ -3275,6 +3295,8 @@ public class HRegion implements HeapSize { // , Writable{ firstSeqIdInLog = key.getLogSeqNum(); } currentEditSeqId = key.getLogSeqNum(); + currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? + key.getOrigLogSeqNum() : currentEditSeqId; boolean flush = false; for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special @@ -3309,6 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{ skippedEdits++; continue; } + kv.setSequenceId(currentReplaySeqId); // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. @@ -4922,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{ writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // 6. Apply to memstore for (KeyValue kv : mutations) { - kv.setMvccVersion(mvccNum); + kv.setSequenceId(mvccNum); Store store = getStore(kv); if (store == null) { checkFamily(CellUtil.cloneFamily(kv)); @@ -5168,7 +5191,7 @@ public class HRegion implements HeapSize { // , Writable{ // so only need to update the timestamp to 'now' newKV.updateLatestStamp(Bytes.toBytes(now)); } - newKV.setMvccVersion(mvccNum); + newKV.setSequenceId(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -5382,7 +5405,7 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(), newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); } - newKV.setMvccVersion(mvccNum); + newKV.setSequenceId(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -6220,4 +6243,14 @@ public class HRegion implements HeapSize { // , Writable{ WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); return key; } + + /** + * Explictly sync wal + * @throws IOException + */ + public void syncWal() throws IOException { + if(this.log != null) { + this.log.sync(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 2d247e9..0b5f5d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -246,7 +246,7 @@ public class MultiVersionConsistencyControl { public static class WriteEntry { private long writeNumber; - private boolean completed = false; + private volatile boolean completed = false; WriteEntry(long writeNumber) { this.writeNumber = writeNumber; http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9a0e8a4..b84f9a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -633,12 +633,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations + * @param replaySeqId * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ private OperationStatus [] doReplayBatchOp(final HRegion region, - final List mutations) throws IOException { + final List mutations, long replaySeqId) throws IOException { HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()]; long before = EnvironmentEdgeManager.currentTimeMillis(); @@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } - return region.batchReplay(mArray); + return region.batchReplay(mArray, replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTimeMillis(); @@ -1330,7 +1331,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List> walEntries = new ArrayList>(); - List mutations = new ArrayList(); // when tag is enabled, we need tag replay edits with log sequence number boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); for (WALEntry entry : entries) { @@ -1354,18 +1354,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } walEntries.add(walEntry); } - mutations.addAll(edits); - } - - if (!mutations.isEmpty()) { - OperationStatus[] result = doReplayBatchOp(region, mutations); - // check if it's a partial success - for (int i = 0; result != null && i < result.length; i++) { - if (result[i] != OperationStatus.SUCCESS) { - throw new IOException(result[i].getExceptionMsg()); + if(edits!=null && !edits.isEmpty()) { + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); + OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); + // check if it's a partial success + for (int i = 0; result != null && i < result.length; i++) { + if (result[i] != OperationStatus.SUCCESS) { + throw new IOException(result[i].getExceptionMsg()); + } } } } + + //sync wal at the end because ASYNC_WAL is used above + region.syncWal(); + if (coprocessorHost != null) { for (Pair wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index f0f92ae..1b07594 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner { return false; } - // For the optimisation in HBASE-4346, we set the KV's memstoreTS to - // 0, if it is older than all the scanners' read points. It is possible - // that a newer KV's memstoreTS was reset to 0. But, there is an - // older KV which was not reset to 0 (because it was - // not old enough during flush). Make sure that we set it correctly now, - // so that the comparision order does not change. - if (cur.getMvccVersion() <= readPt) { - KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0); - } return true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9e792c4..a1d629a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -58,6 +58,9 @@ public abstract class Compactor { private int compactionKVMax; protected Compression.Algorithm compactionCompression; + + /** specify how many days to keep MVCC values during major compaction **/ + protected int keepSeqIdPeriod; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(final Configuration conf, final Store store) { @@ -67,6 +70,8 @@ public abstract class Compactor { this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); this.compactionCompression = (this.store.getFamily() == null) ? Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression(); + this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, + HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); } /** @@ -92,19 +97,30 @@ public abstract class Compactor { public long maxMVCCReadpoint = 0; /** Max tags length**/ public int maxTagsLength = 0; + /** Min SeqId to keep during a major compaction **/ + public long minSeqIdToKeep = 0; } /** * Extracts some details about the files to compact that are commonly needed by compactors. * @param filesToCompact Files. - * @param calculatePutTs Whether earliest put TS is needed. + * @param allFiles Whether all files are included for compaction * @return The result. */ protected FileDetails getFileDetails( - Collection filesToCompact, boolean calculatePutTs) throws IOException { + Collection filesToCompact, boolean allFiles) throws IOException { FileDetails fd = new FileDetails(); + long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - + (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); for (StoreFile file : filesToCompact) { + if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) { + // when isAllFiles is true, all files are compacted so we can calculate the smallest + // MVCC value to keep + if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) { + fd.minSeqIdToKeep = file.getMaxMemstoreTS(); + } + } long seqNum = file.getMaxSequenceId(); fd.maxSeqId = Math.max(fd.maxSeqId, seqNum); StoreFile.Reader r = file.getReader(); @@ -130,7 +146,7 @@ public abstract class Compactor { // If required, calculate the earliest put timestamp of all involved storefiles. // This is used to remove family delete marker during compaction. long earliestPutTs = 0; - if (calculatePutTs) { + if (allFiles) { tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS); if (tmp == null) { // There's a file with no information, must be an old one @@ -148,7 +164,7 @@ public abstract class Compactor { ", size=" + StringUtils.humanReadableInt(r.length()) + ", encoding=" + r.getHFileReader().getDataBlockEncoding() + ", seqNum=" + seqNum + - (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: "")); + (allFiles ? ", earliestPutTs=" + earliestPutTs: "")); } } return fd; @@ -202,10 +218,11 @@ public abstract class Compactor { * @param scanner Where to read from. * @param writer Where to write to. * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(InternalScanner scanner, - CellSink writer, long smallestReadPoint) throws IOException { + CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException { int bytesWritten = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. @@ -218,8 +235,8 @@ public abstract class Compactor { // output to writer: for (Cell c : kvs) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - kv.setMvccVersion(0); + if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { + kv.setSequenceId(0); } writer.append(kv); ++progress.currentCompactedKVs; http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 3e8523d..d5b2b63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor { StoreFile.Writer writer = null; List newFiles = new ArrayList(); + boolean cleanSeqId = false; try { InternalScanner scanner = null; try { @@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor { } // Create the writer even if no kv(Empty store file is also ok), // because we need record the max seq id for the store file, see HBASE-6059 + if(fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); - boolean finished = performCompaction(scanner, writer, smallestReadPoint); + boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 11556e5..487ff46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor { boolean finished = false; InternalScanner scanner = null; + boolean cleanSeqId = false; try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; @@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor { } // Create the writer factory for compactions. + if(fd.minSeqIdToKeep > 0) { + smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); + cleanSeqId = true; + } final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; final Compression.Algorithm compression = store.getFamily().getCompactionCompression(); StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { @@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor { // It is ok here if storeScanner is null. StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory, store.getComparator()); - finished = performCompaction(scanner, mw, smallestReadPoint); + finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId); if (!finished) { throw new InterruptedIOException( "Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 1e9472a..a9c2055 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -91,9 +91,9 @@ class FSWALEntry extends HLog.Entry { */ long stampRegionSequenceId() { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if(memstoreKVs != null && !memstoreKVs.isEmpty()) { + if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) { for(KeyValue kv : this.memstoreKVs){ - kv.setMvccVersion(regionSequenceId); + kv.setSequenceId(regionSequenceId); } } HLogKey key = getKey(); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index ad1c001..5019ff5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -119,6 +119,7 @@ public class HLogKey implements WritableComparable, SequenceNumber { private byte [] encodedRegionName; private TableName tablename; private long logSeqNum; + private long origLogSeqNum = 0; private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1); // Time at which this edit was written. private long writeTime; @@ -256,6 +257,22 @@ public class HLogKey implements WritableComparable, SequenceNumber { } /** + * Used to set original seq Id for HLogKey during wal replay + * @param seqId + */ + public void setOrigLogSeqNum(final long seqId) { + this.origLogSeqNum = seqId; + } + + /** + * Return a positive long if current HLogKey is created from a replay edit + * @return original sequence number of the WALEdit + */ + public long getOrigLogSeqNum() { + return this.origLogSeqNum; + } + + /** * Wait for sequence number is assigned & return the assigned value * @return long the new assigned sequence number * @throws InterruptedException @@ -536,6 +553,9 @@ public class HLogKey implements WritableComparable, SequenceNumber { } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); + if(this.origLogSeqNum > 0) { + builder.setOrigSequenceNumber(this.origLogSeqNum); + } if (this.nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); } @@ -599,5 +619,8 @@ public class HLogKey implements WritableComparable, SequenceNumber { } this.logSeqNum = walKey.getLogSequenceNumber(); this.writeTime = walKey.getWriteTime(); + if(walKey.hasOrigSequenceNumber()) { + this.origLogSeqNum = walKey.getOrigSequenceNumber(); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 0ce4a64..873e863 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; @@ -1863,6 +1864,10 @@ public class HLogSplitter { public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { this.type = type; this.mutation = mutation; + if(this.mutation.getDurability() != Durability.SKIP_WAL) { + // using ASYNC_WAL for relay + this.mutation.setDurability(Durability.ASYNC_WAL); + } this.nonceGroup = nonceGroup; this.nonce = nonce; } @@ -1875,10 +1880,10 @@ public class HLogSplitter { /** * Tag original sequence number for each edit to be replayed - * @param entry + * @param seqId * @param cell */ - private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { + private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) { // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet boolean needAddRecoveryTag = true; if (cell.getTagsLength() > 0) { @@ -1891,8 +1896,7 @@ public class HLogSplitter { } if (needAddRecoveryTag) { List newTags = new ArrayList(); - Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey() - .getLogSequenceNumber())); + Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId)); newTags.add(replayTag); return KeyValue.cloneAndAddTags(cell, newTags); } @@ -1918,6 +1922,8 @@ public class HLogSplitter { return new ArrayList(); } + long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? + entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); int count = entry.getAssociatedCellCount(); List mutations = new ArrayList(); Cell previousCell = null; @@ -1958,7 +1964,7 @@ public class HLogSplitter { } else { Cell tmpNewCell = cell; if (addLogReplayTag) { - tmpNewCell = tagReplayLogSequenceNumber(entry, cell); + tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell); } ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell)); } @@ -1973,8 +1979,8 @@ public class HLogSplitter { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), walKey.getLogSequenceNumber(), - walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce()); + .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds, + walKey.getNonceGroup(), walKey.getNonce()); logEntry.setFirst(key); logEntry.setSecond(val); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 77734a7..8f73431 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -1906,7 +1906,7 @@ public class AccessController extends BaseRegionObserver newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setSequenceId(newKv.getMvccVersion()); return rewriteKv; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index 9cd21d8..39f65db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -1255,7 +1255,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()), newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags); // Preserve mvcc data - rewriteKv.setMvccVersion(newKv.getMvccVersion()); + rewriteKv.setSequenceId(newKv.getMvccVersion()); return rewriteKv; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 86c30d1..fde40ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -3055,7 +3055,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } - private static String safeGetAsStr(List lst, int i) { + public static String safeGetAsStr(List lst, int i) { if (0 <= i && i < lst.size()) { return lst.get(i).toString(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 37456a8..09561cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -176,7 +176,7 @@ public class TestHFileBlock { totalSize += kv.getLength(); if (includesMemstoreTS) { long memstoreTS = randomizer.nextLong(); - kv.setMvccVersion(memstoreTS); + kv.setSequenceId(memstoreTS); totalSize += WritableUtils.getVIntSize(memstoreTS); } hbw.write(kv); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index ebe95b1..3743fdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -241,7 +241,7 @@ public class TestDefaultMemStore extends TestCase { mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); - kv1.setMvccVersion(w.getWriteNumber()); + kv1.setSequenceId(w.getWriteNumber()); memstore.add(kv1); KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); @@ -254,7 +254,7 @@ public class TestDefaultMemStore extends TestCase { w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv2 = new KeyValue(row, f, q2, v); - kv2.setMvccVersion(w.getWriteNumber()); + kv2.setSequenceId(w.getWriteNumber()); memstore.add(kv2); s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); @@ -285,11 +285,11 @@ public class TestDefaultMemStore extends TestCase { mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMvccVersion(w.getWriteNumber()); + kv11.setSequenceId(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMvccVersion(w.getWriteNumber()); + kv12.setSequenceId(w.getWriteNumber()); memstore.add(kv12); mvcc.completeMemstoreInsert(w); @@ -300,11 +300,11 @@ public class TestDefaultMemStore extends TestCase { // START INSERT 2: Write both columns val2 w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv21 = new KeyValue(row, f, q1, v2); - kv21.setMvccVersion(w.getWriteNumber()); + kv21.setSequenceId(w.getWriteNumber()); memstore.add(kv21); KeyValue kv22 = new KeyValue(row, f, q2, v2); - kv22.setMvccVersion(w.getWriteNumber()); + kv22.setSequenceId(w.getWriteNumber()); memstore.add(kv22); // BEFORE COMPLETING INSERT 2, SEE FIRST KVS @@ -337,11 +337,11 @@ public class TestDefaultMemStore extends TestCase { mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); - kv11.setMvccVersion(w.getWriteNumber()); + kv11.setSequenceId(w.getWriteNumber()); memstore.add(kv11); KeyValue kv12 = new KeyValue(row, f, q2, v1); - kv12.setMvccVersion(w.getWriteNumber()); + kv12.setSequenceId(w.getWriteNumber()); memstore.add(kv12); mvcc.completeMemstoreInsert(w); @@ -353,7 +353,7 @@ public class TestDefaultMemStore extends TestCase { w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); - kvDel.setMvccVersion(w.getWriteNumber()); + kvDel.setSequenceId(w.getWriteNumber()); memstore.add(kvDel); // BEFORE COMPLETING DELETE, SEE FIRST KVS @@ -414,7 +414,7 @@ public class TestDefaultMemStore extends TestCase { byte[] v = Bytes.toBytes(i); KeyValue kv = new KeyValue(row, f, q1, i, v); - kv.setMvccVersion(w.getWriteNumber()); + kv.setSequenceId(w.getWriteNumber()); memstore.add(kv); mvcc.completeMemstoreInsert(w); @@ -827,7 +827,7 @@ public class TestDefaultMemStore extends TestCase { KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); - kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1); + kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1); l.add(kv1); l.add(kv2); l.add(kv3); this.memstore.upsert(l, 2);// readpoint is 2 @@ -835,7 +835,7 @@ public class TestDefaultMemStore extends TestCase { assert(newSize > oldSize); KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); - kv4.setMvccVersion(1); + kv4.setSequenceId(1); l.clear(); l.add(kv4); this.memstore.upsert(l, 3); assertEquals(newSize, this.memstore.size.get()); @@ -877,7 +877,7 @@ public class TestDefaultMemStore extends TestCase { // test the case that the timeOfOldestEdit is updated after a KV upsert List l = new ArrayList(); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); - kv1.setMvccVersion(100); + kv1.setSequenceId(100); l.add(kv1); memstore.upsert(l, 1000); t = memstore.timeOfOldestEdit(); http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index fd944f9..8a588e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -528,6 +528,7 @@ public class TestHRegion { } long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); + region.getMVCC().initialize(seqId); Get get = new Get(row); Result result = region.get(get); for (long i = minSeqId; i <= maxSeqId; i += 10) { @@ -579,6 +580,7 @@ public class TestHRegion { } long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); + region.getMVCC().initialize(seqId); Get get = new Get(row); Result result = region.get(get); for (long i = minSeqId; i <= maxSeqId; i += 10) { http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 2cd5e3a..c71f4f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -689,7 +689,7 @@ public class TestReversibleScanners { private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) { KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS, VALUES[rowNum % VALUESIZE]); - kv.setMvccVersion(makeMVCC(rowNum, cqNum)); + kv.setSequenceId(makeMVCC(rowNum, cqNum)); return kv; } http://git-wip-us.apache.org/repos/asf/hbase/blob/cc4f5477/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java index 988d82f..450dd82 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -449,5 +448,32 @@ public class TestSeekOptimizations { } + public void assertKVListsEqual(String additionalMsg, + final List expected, + final List actual) { + final int eLen = expected.size(); + final int aLen = actual.size(); + final int minLen = Math.min(eLen, aLen); + + int i; + for (i = 0; i < minLen + && KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0; + ++i) {} + + if (additionalMsg == null) { + additionalMsg = ""; + } + if (!additionalMsg.isEmpty()) { + additionalMsg = ". " + additionalMsg; + } + + if (eLen != aLen || i != minLen) { + throw new AssertionError( + "Expected and actual KV arrays differ at position " + i + ": " + + HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " + + HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg); + } + } + }