Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C47D11037 for ; Sat, 7 Jun 2014 01:19:55 +0000 (UTC) Received: (qmail 56508 invoked by uid 500); 7 Jun 2014 01:19:55 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 56462 invoked by uid 500); 7 Jun 2014 01:19:55 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 56355 invoked by uid 99); 7 Jun 2014 01:19:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Jun 2014 01:19:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BD7E28BB616; Sat, 7 Jun 2014 01:19:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeffreyz@apache.org To: commits@hbase.apache.org Date: Sat, 07 Jun 2014 01:19:55 -0000 Message-Id: <2a589a14e3b84e6db188f2a4ae09c5b9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: hbase-8763: Combine MVCC and SeqId hbase-8763: Combine MVCC and SeqId Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c682d57e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c682d57e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c682d57e Branch: refs/heads/master Commit: c682d57e92d9f18a02e1fe8dc50c5caa116e5d4a Parents: d6cc2fb Author: Jeffrey Zhong Authored: Fri Jun 6 18:25:46 2014 -0700 Committer: Jeffrey Zhong Committed: Fri Jun 6 18:25:46 2014 -0700 ---------------------------------------------------------------------- .../hbase/regionserver/DefaultMemStore.java | 17 +- .../hadoop/hbase/regionserver/HRegion.java | 413 +++++++++++-------- .../hadoop/hbase/regionserver/HStore.java | 5 +- .../hadoop/hbase/regionserver/MemStore.java | 6 +- .../MultiVersionConsistencyControl.java | 187 ++++++--- .../hbase/regionserver/SequenceNumber.java | 31 ++ .../apache/hadoop/hbase/regionserver/Store.java | 5 +- .../hadoop/hbase/regionserver/StoreFlusher.java | 6 - .../hadoop/hbase/regionserver/wal/FSHLog.java | 67 +-- .../hbase/regionserver/wal/FSWALEntry.java | 31 +- .../hadoop/hbase/regionserver/wal/HLog.java | 11 +- .../hadoop/hbase/regionserver/wal/HLogKey.java | 45 +- .../hbase/regionserver/wal/HLogSplitter.java | 4 +- .../hadoop/hbase/regionserver/wal/HLogUtil.java | 2 +- .../hadoop/hbase/regionserver/wal/WALEdit.java | 2 +- .../hadoop/hbase/client/TestMultiParallel.java | 4 +- .../hbase/regionserver/TestDefaultMemStore.java | 23 +- .../hadoop/hbase/regionserver/TestHRegion.java | 17 +- .../TestMultiVersionConsistencyControl.java | 4 +- .../hadoop/hbase/regionserver/TestStore.java | 12 +- 20 files changed, 554 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 51f22d8..ad084a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -210,12 +211,13 @@ public class DefaultMemStore implements MemStore { /** * Write an update * @param cell - * @return approximate size of the passed key and value. + * @return approximate size of the passed KV & newly added KV which maybe different than the + * passed-in KV */ @Override - public long add(Cell cell) { + public Pair add(Cell cell) { KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell)); - return internalAdd(toAdd); + return new Pair(internalAdd(toAdd), toAdd); } @Override @@ -1051,18 +1053,21 @@ public class DefaultMemStore implements MemStore { byte [] empty = new byte[0]; for (int i = 0; i < count; i++) { // Give each its own ts - size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + Pair ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + size += ret.getFirst(); } LOG.info("memstore1 estimated size=" + size); for (int i = 0; i < count; i++) { - size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + Pair ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty)); + size += ret.getFirst(); } LOG.info("memstore1 estimated size (2nd loading of same data)=" + size); // Make a variably sized memstore. DefaultMemStore memstore2 = new DefaultMemStore(); for (int i = 0; i < count; i++) { - size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, + Pair ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i])); + size += ret.getFirst(); } LOG.info("memstore2 estimated size=" + size); final int seconds = 30; http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0c4f7ee..dedaf41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -824,10 +824,11 @@ public class HRegion implements HeapSize { // , Writable{ } } } - mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1); + mvcc.initialize(maxSeqId); return maxSeqId; } @@ -1684,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{ // wal can be null replaying edits. return wal != null? new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - getNextSequenceId(wal, startTime), "Nothing to flush"): + getNextSequenceId(wal), "Nothing to flush"): new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); } } finally { @@ -1714,58 +1715,64 @@ public class HRegion implements HeapSize { // , Writable{ getRegionInfo().getEncodedName()); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; + try { - // Record the mvcc for all transactions in progress. - w = mvcc.beginMemstoreInsert(); - mvcc.advanceMemstore(w); - if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { - // This should never happen. - String msg = "Flush will not be started for [" - + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; - status.setStatus(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + try { + w = mvcc.beginMemstoreInsert(); + if (wal != null) { + if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + // This should never happen. + String msg = "Flush will not be started for [" + + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; + status.setStatus(msg); + return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); + } + // Get a sequence id that we can use to denote the flush. It will be one beyond the last + // edit that made it into the hfile (the below does not add an edit, it just asks the + // WAL system to return next sequence edit). + flushSeqId = getNextSequenceId(wal); + } else { + // use the provided sequence Id as WAL is not being used for this flush. + flushSeqId = myseqid; } - // Get a sequence id that we can use to denote the flush. It will be one beyond the last - // edit that made it into the hfile (the below does not add an edit, it just asks the - // WAL system to return next sequence edit). - flushSeqId = getNextSequenceId(wal, startTime); - } else { - // use the provided sequence Id as WAL is not being used for this flush. - flushSeqId = myseqid; - } - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); - storeFlushCtxs.add(s.createFlushContext(flushSeqId)); - } + for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); + storeFlushCtxs.add(s.createFlushContext(flushSeqId)); + } - // Prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs) { - flush.prepare(); + // Prepare flush (take a snapshot) + for (StoreFlushContext flush : storeFlushCtxs) { + flush.prepare(); + } + } finally { + this.updatesLock.writeLock().unlock(); } + String s = "Finished memstore snapshotting " + this + + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + status.setStatus(s); + if (LOG.isTraceEnabled()) LOG.trace(s); + // sync unflushed WAL changes when deferred log sync is enabled + // see HBASE-8208 for details + if (wal != null && !shouldSyncLog()) wal.sync(); + + // wait for all in-progress transactions to commit to HLog before + // we can start the flush. This prevents + // uncommitted transactions from being written into HFiles. + // We have to block before we start the flush, otherwise keys that + // were removed via a rollbackMemstore could be written to Hfiles. + mvcc.waitForPreviousTransactionsComplete(w); + // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block + w = null; + s = "Flushing stores of " + this; + status.setStatus(s); + if (LOG.isTraceEnabled()) LOG.trace(s); } finally { - this.updatesLock.writeLock().unlock(); + if (w != null) { + // in case of failure just mark current w as complete + mvcc.advanceMemstore(w); + } } - String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushSize=" + totalFlushableSize; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); - - // sync unflushed WAL changes when deferred log sync is enabled - // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) wal.sync(); - - // wait for all in-progress transactions to commit to HLog before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.waitForRead(w); - - s = "Flushing stores of " + this; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); // Any failure from here on out will be catastrophic requiring server // restart so hlog content can be replayed and put back into the memstore. @@ -1849,13 +1856,9 @@ public class HRegion implements HeapSize { // , Writable{ * @return Next sequence number unassociated with any actual edit. * @throws IOException */ - private long getNextSequenceId(final HLog wal, final long now) throws IOException { - HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable()); - // Call append but with an empty WALEdit. The returned sequence id will not be associated - // with any edit and we can be sure it went in after all outstanding appends. - wal.appendNoSync(getTableDesc(), getRegionInfo(), key, - WALEdit.EMPTY_WALEDIT, this.sequenceId, false); - return key.getLogSeqNum(); + private long getNextSequenceId(final HLog wal) throws IOException { + HLogKey key = this.appendNoSyncNoAppend(wal, null); + return key.getSequenceNumber(); } ////////////////////////////////////////////////////////////////////////////// @@ -2349,11 +2352,14 @@ public class HRegion implements HeapSize { // , Writable{ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; + List memstoreCells = new ArrayList(); // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; int noOfPuts = 0, noOfDeletes = 0; + HLogKey walKey = null; + long mvccNum = 0; try { // ------------------------------------ // STEP 1. Try to acquire as many locks as we can, and ensure @@ -2475,13 +2481,13 @@ public class HRegion implements HeapSize { // , Writable{ lock(this.updatesLock.readLock(), numReadyToWrite); locked = true; - + mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); // // ------------------------------------ // Acquire the latest mvcc number // ---------------------------------- - w = mvcc.beginMemstoreInsert(); - + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); + // calling the pre CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = @@ -2506,13 +2512,12 @@ public class HRegion implements HeapSize { // , Writable{ continue; } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); } // ------------------------------------ // STEP 4. Build WAL edit // ---------------------------------- - boolean hasWalAppends = false; Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing @@ -2543,13 +2548,13 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Multiple nonces per batch and not in replay"); } // txid should always increase, so having the one from the last call is ok. - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, - currentNonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, - walEdit, getSequenceId(), true); - hasWalAppends = true; + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, m.getClusterIds(), + currentNonceGroup, currentNonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, + walEdit, getSequenceId(), true, null); walEdit = new WALEdit(isInReplay); + walKey = null; } currentNonceGroup = nonceGroup; currentNonce = nonce; @@ -2570,12 +2575,15 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), - currentNonceGroup, currentNonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit, - getSequenceId(), true); - hasWalAppends = true; + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, + mutation.getClusterIds(), currentNonceGroup, currentNonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, + getSequenceId(), true, memstoreCells); + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } // ------------------------------- @@ -2590,9 +2598,10 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- // STEP 7. Sync wal. // ------------------------- - if (hasWalAppends) { + if (txid != 0) { syncOrDefer(txid, durability); } + doRollBackMemstore = false; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -2606,7 +2615,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); w = null; } @@ -2636,9 +2645,11 @@ public class HRegion implements HeapSize { // , Writable{ // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive); + rollbackMemstore(memstoreCells); + } + if (w != null) { + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); } - if (w != null) mvcc.completeMemstoreInsert(w); if (locked) { this.updatesLock.readLock().unlock(); @@ -2727,7 +2738,7 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactionsComplete(); try { if (this.getCoprocessorHost() != null) { Boolean processed = null; @@ -2903,34 +2914,25 @@ public class HRegion implements HeapSize { // , Writable{ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param output newly added KVs into memstore * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + long mvccNum, List memstoreCells) { long size = 0; - boolean freemvcc = false; - try { - if (localizedWriteEntry == null) { - localizedWriteEntry = mvcc.beginMemstoreInsert(); - freemvcc = true; - } - - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List cells = e.getValue(); + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List cells = e.getValue(); - Store store = getStore(family); - for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setMvccVersion(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); - } - } - } finally { - if (freemvcc) { - mvcc.completeMemstoreInsert(localizedWriteEntry); + Store store = getStore(family); + for (Cell cell: cells) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + kv.setMvccVersion(mvccNum); + Pair ret = store.add(kv); + size += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); } } @@ -2942,35 +2944,16 @@ public class HRegion implements HeapSize { // , Writable{ * called when a Put/Delete has updated memstore but subsequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(BatchOperationInProgress batchOp, - Map>[] familyMaps, - int start, int end) { + private void rollbackMemstore(List memstoreCells) { int kvsRolledback = 0; - for (int i = start; i < end; i++) { - // skip over request that never succeeded in the first place. - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.SUCCESS) { - continue; - } - - // Rollback all the kvs for this row. - Map> familyMap = familyMaps[i]; - for (Map.Entry> e : familyMap.entrySet()) { - byte[] family = e.getKey(); - List cells = e.getValue(); - - // Remove those keys from the memstore that matches our - // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is - // that even the memstoreTS has to match for keys that will be rolled-back. - Store store = getStore(family); - for (Cell cell: cells) { - store.rollback(KeyValueUtil.ensureKeyValue(cell)); - kvsRolledback++; - } - } + + for (KeyValue kv : memstoreCells) { + byte[] family = kv.getFamily(); + Store store = getStore(family); + store.rollback(kv); + kvsRolledback++; } - LOG.debug("rollbackMemstore rolled back " + kvsRolledback + - " keyvalues from start:" + start + " to end:" + end); + LOG.debug("rollbackMemstore rolled back " + kvsRolledback); } /** @@ -3378,7 +3361,7 @@ public class HRegion implements HeapSize { // , Writable{ * @return True if we should flush. */ protected boolean restoreEdit(final Store s, final KeyValue kv) { - long kvSize = s.add(kv); + long kvSize = s.add(kv).getFirst(); if (this.rsAccounting != null) { rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize); } @@ -4883,7 +4866,10 @@ public class HRegion implements HeapSize { // , Writable{ List acquiredRowLocks; long addedSize = 0; List mutations = new ArrayList(); + List memstoreCells = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); + long mvccNum = 0; + HLogKey walKey = null; try { // 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); @@ -4894,6 +4880,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; + mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); long now = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -4904,27 +4891,35 @@ public class HRegion implements HeapSize { // , Writable{ if (!mutations.isEmpty()) { // 5. Get a mvcc write number - writeEntry = mvcc.beginMemstoreInsert(); + writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); // 6. Apply to memstore for (KeyValue kv : mutations) { - kv.setMvccVersion(writeEntry.getWriteNumber()); + kv.setMvccVersion(mvccNum); Store store = getStore(kv); if (store == null) { checkFamily(CellUtil.cloneFamily(kv)); // unreachable } - addedSize += store.add(kv); + Pair ret = store.add(kv); + addedSize += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); } long txid = 0; // 7. Append no sync if (!walEdit.isEmpty()) { - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup, - nonce); + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, + processor.getClusterIds(), nonceGroup, nonce); txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), - key, walEdit, getSequenceId(), true); + walKey, walEdit, getSequenceId(), true, memstoreCells); } + if(walKey == null){ + // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit + // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); + } + // 8. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -4951,7 +4946,7 @@ public class HRegion implements HeapSize { // , Writable{ } // 11. Roll mvcc forward if (writeEntry != null) { - mvcc.completeMemstoreInsert(writeEntry); + mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } if (locked) { this.updatesLock.readLock().unlock(); @@ -5055,8 +5050,12 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.APPEND); this.writeRequestsCount.increment(); + long mvccNum = 0; WriteEntry w = null; - RowLock rowLock; + HLogKey walKey = null; + RowLock rowLock = null; + List memstoreCells = new ArrayList(); + boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); try { @@ -5064,7 +5063,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactionsComplete(); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preAppendAfterRowLock(append); if(r!= null) { @@ -5072,7 +5071,8 @@ public class HRegion implements HeapSize { // , Writable{ } } // now start my own transaction - w = mvcc.beginMemstoreInsert(); + mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { @@ -5140,7 +5140,7 @@ public class HRegion implements HeapSize { // , Writable{ // so only need to update the timestamp to 'now' newKV.updateLatestStamp(Bytes.toBytes(now)); } - newKV.setMvccVersion(w.getWriteNumber()); + newKV.setMvccVersion(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -5161,34 +5161,43 @@ public class HRegion implements HeapSize { // , Writable{ tempMemstore.put(store, kvs); } - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), nonceGroup, nonce); - txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits, - this.sequenceId, true); - } else { - recordMutationWithoutWal(append.getFamilyCellMap()); - } - //Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); + memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + Pair ret = store.add(kv); + size += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + doRollBackMemstore = true; } } allKVs.addAll(entry.getValue()); } + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits, + this.sequenceId, true, memstoreCells); + } else { + recordMutationWithoutWal(append.getFamilyCellMap()); + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); + } + size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -5196,14 +5205,23 @@ public class HRegion implements HeapSize { // , Writable{ } } finally { rowLock.release(); + rowLock = null; } - if (writeToWAL) { - // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + if(txid != 0){ syncOrDefer(txid, durability); } + doRollBackMemstore = false; } finally { + if (rowLock != null) { + rowLock.release(); + } + // if the wal sync was unsuccessful, remove keys from memstore + if (doRollBackMemstore) { + rollbackMemstore(memstoreCells); + } if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); } closeRegionOperation(Operation.APPEND); } @@ -5250,15 +5268,20 @@ public class HRegion implements HeapSize { // , Writable{ // Lock row startRegionOperation(Operation.INCREMENT); this.writeRequestsCount.increment(); + RowLock rowLock = null; WriteEntry w = null; + HLogKey walKey = null; + long mvccNum = 0; + List memstoreCells = new ArrayList(); + boolean doRollBackMemstore = false; try { - RowLock rowLock = getRowLock(row); + rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); try { // wait for all prior MVCC transactions to finish - while we hold the row lock // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + mvcc.waitForPreviousTransactionsComplete(); if (this.coprocessorHost != null) { Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); if (r != null) { @@ -5266,7 +5289,8 @@ public class HRegion implements HeapSize { // , Writable{ } } // now start my own transaction - w = mvcc.beginMemstoreInsert(); + mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); + w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family: @@ -5330,7 +5354,7 @@ public class HRegion implements HeapSize { // , Writable{ System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(), newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); } - newKV.setMvccVersion(w.getWriteNumber()); + newKV.setMvccVersion(mvccNum); // Give coprocessors a chance to update the new cell if (coprocessorHost != null) { newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( @@ -5357,20 +5381,6 @@ public class HRegion implements HeapSize { // , Writable{ } } - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), nonceGroup, nonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), - key, walEdits, getSequenceId(), true); - } else { - recordMutationWithoutWal(increment.getFamilyCellMap()); - } - } //Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -5378,30 +5388,62 @@ public class HRegion implements HeapSize { // , Writable{ if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); + memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + Pair ret = store.add(kv); + size += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + doRollBackMemstore = true; } } } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } + + // Actually write to WAL now + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, getSequenceId(), true, memstoreCells); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); + } } finally { this.updatesLock.readLock().unlock(); } } finally { rowLock.release(); + rowLock = null; } - if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { - // sync the transaction log outside the rowlock + // sync the transaction log outside the rowlock + if(txid != 0){ syncOrDefer(txid, durability); } + doRollBackMemstore = false; } finally { + if (rowLock != null) { + rowLock.release(); + } + // if the wal sync was unsuccessful, remove keys from memstore + if (doRollBackMemstore) { + rollbackMemstore(memstoreCells); + } if (w != null) { - mvcc.completeMemstoreInsert(w); + mvcc.completeMemstoreInsertWithSeqNum(w, walKey); } closeRegionOperation(Operation.INCREMENT); if (this.metricsRegion != null) { @@ -6130,4 +6172,23 @@ public class HRegion implements HeapSize { // , Writable{ } } } + + /** + * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore + * the WALEdit append later. + * @param wal + * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to + * be updated with right mvcc values(their log sequence nu + * @return + * @throws IOException + */ + private HLogKey appendNoSyncNoAppend(final HLog wal, List cells) throws IOException { + HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), + HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); + // Call append but with an empty WALEdit. The returned seqeunce id will not be associated + // with any edit and we can be sure it went in after all outstanding appends. + wal.appendNoSync(getTableDesc(), getRegionInfo(), key, + WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells); + return key; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 6de94c8..2218244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -564,10 +565,10 @@ public class HStore implements Store { } @Override - public long add(final KeyValue kv) { + public Pair add(final KeyValue kv) { lock.readLock().lock(); try { - return this.memstore.add(kv); + return this.memstore.add(kv); } finally { lock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index b370b49..ac2155a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.Pair; /** * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. @@ -61,9 +62,10 @@ public interface MemStore extends HeapSize { /** * Write an update * @param cell - * @return approximate size of the passed key and value. + * @return approximate size of the passed KV and the newly added KV which maybe different from the + * passed in KV. */ - long add(final Cell cell); + Pair add(final Cell cell); /** * @return Oldest timestamp of all the Cells in the MemStore http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index b46d55b..4343313 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -32,20 +34,18 @@ import org.apache.hadoop.hbase.util.ClassSize; */ @InterfaceAudience.Private public class MultiVersionConsistencyControl { - private volatile long memstoreRead = 0; - private volatile long memstoreWrite = 0; - + private static final long NO_WRITE_NUMBER = 0; + private volatile long memstoreRead = 0; private final Object readWaiters = new Object(); // This is the pending queue of writes. private final LinkedList writeQueue = new LinkedList(); - + /** * Default constructor. Initializes the memstoreRead/Write points to 0. */ public MultiVersionConsistencyControl() { - this.memstoreRead = this.memstoreWrite = 0; } /** @@ -54,37 +54,86 @@ public class MultiVersionConsistencyControl { */ public void initialize(long startPoint) { synchronized (writeQueue) { - if (this.memstoreWrite != this.memstoreRead) { - throw new RuntimeException("Already used this mvcc. Too late to initialize"); - } - - this.memstoreRead = this.memstoreWrite = startPoint; + writeQueue.clear(); + memstoreRead = startPoint; } } /** - * Generate and return a {@link WriteEntry} with a new write number. - * To complete the WriteEntry and wait for it to be visible, - * call {@link #completeMemstoreInsert(WriteEntry)}. + * + * @param initVal The value we used initially and expected it'll be reset later + * @return */ - public WriteEntry beginMemstoreInsert() { + WriteEntry beginMemstoreInsert() { + return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); + } + + /** + * Get a mvcc write number before an actual one(its log sequence Id) being assigned + * @param sequenceId + * @return long a faked write number which is bigger enough not to be seen by others before a real + * one is assigned + */ + public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { + // the 1 billion is just an arbitrary big number to guard no scanner will reach it before + // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers + // because each handler could increment sequence num twice and max concurrent in-flight + // transactions is the number of RPC handlers. + // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple + // changes touch same row key + // If for any reason, the bumped value isn't reset due to failure situations, we'll reset + // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all + return sequenceId.incrementAndGet() + 1000000000; + } + + /** + * This function starts a MVCC transaction with current region's log change sequence number. Since + * we set change sequence number when flushing current change to WAL(late binding), the flush + * order may differ from the order to start a MVCC transaction. For example, a change begins a + * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we + * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent + * transactions will reuse the number till current MVCC completes(success or fail). The "faked" + * big number is safe because we only need it to prevent current change being seen and the number + * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order + * for MVCC to align with flush sequence. + * @param curSeqNum + * @return WriteEntry a WriteEntry instance with the passed in curSeqNum + */ + public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { + WriteEntry e = new WriteEntry(curSeqNum); synchronized (writeQueue) { - long nextWriteNumber = ++memstoreWrite; - WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); return e; } } /** - * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. - * - * At the end of this call, the global read point is at least as large as the write point - * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. + * Complete a {@link WriteEntry} that was created by + * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read + * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is + * visible to MVCC readers. + * @throws IOException + */ + public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum) + throws IOException { + if(e == null) return; + if (seqNum != null) { + e.setWriteNumber(seqNum.getSequenceNumber()); + } else { + // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside + // function beginMemstoreInsertWithSeqNum in case of failures + e.setWriteNumber(NO_WRITE_NUMBER); + } + waitForPreviousTransactionsComplete(e); + } + + /** + * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the + * end of this call, the global read point is at least as large as the write point of the passed + * in WriteEntry. Thus, the write is visible to MVCC readers. */ public void completeMemstoreInsert(WriteEntry e) { - advanceMemstore(e); - waitForRead(e); + waitForPreviousTransactionsComplete(e); } /** @@ -99,75 +148,94 @@ public class MultiVersionConsistencyControl { * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) */ boolean advanceMemstore(WriteEntry e) { + long nextReadValue = -1; synchronized (writeQueue) { e.markCompleted(); - long nextReadValue = -1; - boolean ranOnce=false; while (!writeQueue.isEmpty()) { - ranOnce=true; WriteEntry queueFirst = writeQueue.getFirst(); - - if (nextReadValue > 0) { - if (nextReadValue+1 != queueFirst.getWriteNumber()) { - throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: " - + nextReadValue + " next: " + queueFirst.getWriteNumber()); - } - } - if (queueFirst.isCompleted()) { - nextReadValue = queueFirst.getWriteNumber(); + // Using Max because Edit complete in WAL sync order not arriving order + nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); writeQueue.removeFirst(); } else { break; } } - if (!ranOnce) { - throw new RuntimeException("never was a first"); + if (nextReadValue > memstoreRead) { + memstoreRead = nextReadValue; } - if (nextReadValue > 0) { - synchronized (readWaiters) { - memstoreRead = nextReadValue; - readWaiters.notifyAll(); - } - } - if (memstoreRead >= e.getWriteNumber()) { - return true; + // notify waiters on writeQueue before return + writeQueue.notifyAll(); + } + + if (nextReadValue > 0) { + synchronized (readWaiters) { + readWaiters.notifyAll(); } - return false; } + + if (memstoreRead >= e.getWriteNumber()) { + return true; + } + return false; } /** - * Wait for the global readPoint to advance upto - * the specified transaction number. + * Wait for all previous MVCC transactions complete */ - public void waitForRead(WriteEntry e) { + public void waitForPreviousTransactionsComplete() { + WriteEntry w = beginMemstoreInsert(); + waitForPreviousTransactionsComplete(w); + } + + public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { boolean interrupted = false; - synchronized (readWaiters) { - while (memstoreRead < e.getWriteNumber()) { - try { - readWaiters.wait(0); - } catch (InterruptedException ie) { - // We were interrupted... finish the loop -- i.e. cleanup --and then - // on our way out, reset the interrupt flag. - interrupted = true; + WriteEntry w = waitedEntry; + + try { + WriteEntry firstEntry = null; + do { + synchronized (writeQueue) { + // writeQueue won't be empty at this point, the following is just a safety check + if (writeQueue.isEmpty()) { + break; + } + firstEntry = writeQueue.getFirst(); + if (firstEntry == w) { + // all previous in-flight transactions are done + break; + } + try { + writeQueue.wait(0); + } catch (InterruptedException ie) { + // We were interrupted... finish the loop -- i.e. cleanup --and then + // on our way out, reset the interrupt flag. + interrupted = true; + break; + } } + } while (firstEntry != null); + } finally { + if (w != null) { + advanceMemstore(w); } } - if (interrupted) Thread.currentThread().interrupt(); + if (interrupted) { + Thread.currentThread().interrupt(); + } } public long memstoreReadPoint() { return memstoreRead; } - public static class WriteEntry { private long writeNumber; private boolean completed = false; + WriteEntry(long writeNumber) { this.writeNumber = writeNumber; } @@ -180,6 +248,9 @@ public class MultiVersionConsistencyControl { long getWriteNumber() { return this.writeNumber; } + void setWriteNumber(long val){ + this.writeNumber = val; + } } public static final long FIXED_SIZE = ClassSize.align( http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java new file mode 100644 index 0000000..90b1029 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Interface which abstracts implementations on log sequence number assignment + */ +@InterfaceAudience.Private +public interface SequenceNumber { + public long getSequenceNumber() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8923769..fd73f2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Pair; /** * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or @@ -122,9 +123,9 @@ public interface Store extends HeapSize, StoreConfigInformation { /** * Adds a value to the memstore * @param kv - * @return memstore size delta + * @return memstore size delta & newly added KV which maybe different than the passed in KV */ - long add(KeyValue kv); + Pair add(KeyValue kv); /** * When was the last edit done in the memstore http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index b876972..7403700 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -121,12 +121,6 @@ abstract class StoreFlusher { // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); - if (kv.getMvccVersion() <= smallestReadPoint) { - // let us not change the original KV. It could be in the memstore - // changing its memstoreTS could affect other threads/scanners. - kv = kv.shallowCopy(); - kv.setMvccVersion(0); - } sink.append(kv); } kvs.clear(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 268e302..c0c7dbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1064,13 +1064,26 @@ class FSHLog implements HLog, Syncable { } } + /** + * @param now + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tableName + * @param clusterIds that have consumed the change + * @return New log key. + */ + protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, + long now, List clusterIds, long nonceGroup, long nonce) { + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); + } + @Override @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now); - append(htd, info, logKey, edits, sequenceId, true, true); + append(htd, info, logKey, edits, sequenceId, true, true, null); } @Override @@ -1079,14 +1092,15 @@ class FSHLog implements HLog, Syncable { boolean inMemstore, long nonceGroup, long nonce) throws IOException { HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce); - return append(htd, info, logKey, edits, sequenceId, false, inMemstore); + return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null); } @Override public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore) + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, + final List memstoreKVs) throws IOException { - return append(htd, info, key, edits, sequenceId, false, inMemstore); + return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs); } /** @@ -1101,19 +1115,22 @@ class FSHLog implements HLog, Syncable { * @param sync shall we sync after we call the append? * @param inMemstore * @param sequenceId The region sequence id reference. + * @param memstoreKVs * @return txid of this transaction or if nothing to do, the last txid * @throws IOException */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key, - WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore) + WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore, + List memstoreKVs) throws IOException { if (!this.enabled) return this.highestUnsyncedSequence; if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); + // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. @@ -1124,19 +1141,10 @@ class FSHLog implements HLog, Syncable { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri); + entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); - // Now wait until the region edit/sequence id is available. The 'entry' has an internal - // latch that is thrown when the region edit/sequence id is set. Calling - // entry.getRegionSequenceId will cause us block until the latch is thrown. The return is - // the region edit/sequence id, not the ring buffer txid. - try { - entry.getRegionSequenceId(); - } catch (InterruptedException e) { - throw convertInterruptedExceptionToIOException(e); - } } // doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after // all edits on a handler have been added. @@ -1894,6 +1902,14 @@ class FSHLog implements HLog, Syncable { // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); + + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual + // edit. It has to go through all the rigmarole to be sure we have the right ordering. + if (entry.getEdit().isEmpty()) { + return; + } + // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { @@ -1909,19 +1925,16 @@ class FSHLog implements HLog, Syncable { entry.getEdit()); } } - // If empty, there is nothing to append. Maybe empty when we are looking for a region - // sequence id only, a region edit/sequence id that is not associated with an actual edit. - // It has to go through all the rigmarole to be sure we have the right ordering. - if (!entry.getEdit().isEmpty()) { - writer.append(entry); - assert highestUnsyncedSequence < entry.getSequence(); - highestUnsyncedSequence = entry.getSequence(); - Long lRegionSequenceId = Long.valueOf(regionSequenceId); - highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); - if (entry.isInMemstore()) { - oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); - } + + writer.append(entry); + assert highestUnsyncedSequence < entry.getSequence(); + highestUnsyncedSequence = entry.getSequence(); + Long lRegionSequenceId = Long.valueOf(regionSequenceId); + highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); + if (entry.isInMemstore()) { + oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); } + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start); http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 9799269..1e9472a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; /** * A WAL Entry for {@link FSHLog} implementation. Immutable. @@ -41,19 +43,18 @@ class FSWALEntry extends HLog.Entry { private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; - // Latch that is set on creation and then is undone on the other side of the ring buffer by the - // consumer thread just after it sets the region edit/sequence id in here. - private final transient CountDownLatch latch = new CountDownLatch(1); + private final transient List memstoreKVs; FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, - final HTableDescriptor htd, final HRegionInfo hri) { + final HTableDescriptor htd, final HRegionInfo hri, List memstoreKVs) { super(key, edit); this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; this.hri = hri; this.sequence = sequence; + this.memstoreKVs = memstoreKVs; } public String toString() { @@ -90,15 +91,13 @@ class FSWALEntry extends HLog.Entry { */ long stampRegionSequenceId() { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - getKey().setLogSeqNum(regionSequenceId); - // On creation, a latch was set. Count it down when sequence id is set. This will free - // up anyone blocked on {@link #getRegionSequenceId()} - this.latch.countDown(); + if(memstoreKVs != null && !memstoreKVs.isEmpty()) { + for(KeyValue kv : this.memstoreKVs){ + kv.setMvccVersion(regionSequenceId); + } + } + HLogKey key = getKey(); + key.setLogSeqNum(regionSequenceId); return regionSequenceId; } - - long getRegionSequenceId() throws InterruptedException { - this.latch.await(); - return getKey().getLogSeqNum(); - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ca6f444..99a9a5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -34,8 +34,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.io.Writable; @@ -290,8 +292,8 @@ public interface HLog { * @param sequenceId * @throws IOException * @deprecated For tests only and even then, should use - * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)} - * and {@link #sync()} instead. + * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, + * List)} and {@link #sync()} instead. */ @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, @@ -337,7 +339,7 @@ public interface HLog { * able to sync an explicit edit only (the current default implementation syncs up to the time * of the sync call syncing whatever is behind the sync). * @throws IOException - * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)} + * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)} * instead because you can get back the region edit/sequenceid; it is set into the passed in * key. */ @@ -361,12 +363,13 @@ public interface HLog { * @param inMemstore Always true except for case where we are writing a compaction completion * record into the WAL; in this case the entry is just so we can finish an unfinished compaction * -- it is not an edit for memstore. + * @param memstoreKVs list of KVs added into memstore * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. * @throws IOException */ long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore) + AtomicLong sequenceId, boolean inMemstore, List memstoreKVs) throws IOException; // TODO: Do we need all these versions of sync? http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index f591f4e..ad1c001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,6 +32,10 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; + + +import com.google.protobuf.HBaseZeroCopyByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; +import org.apache.hadoop.hbase.regionserver.SequenceNumber; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.WritableComparable; @@ -49,7 +55,6 @@ import org.apache.hadoop.io.WritableUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import com.google.protobuf.HBaseZeroCopyByteString; /** * A Key for an entry in the change log. @@ -64,7 +69,7 @@ import com.google.protobuf.HBaseZeroCopyByteString; // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical // purposes. They need to be merged into HLogEntry. @InterfaceAudience.Private -public class HLogKey implements WritableComparable { +public class HLogKey implements WritableComparable, SequenceNumber { public static final Log LOG = LogFactory.getLog(HLogKey.class); // should be < 0 (@see #readFields(DataInput)) @@ -114,6 +119,7 @@ public class HLogKey implements WritableComparable { private byte [] encodedRegionName; private TableName tablename; private long logSeqNum; + private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1); // Time at which this edit was written. private long writeTime; @@ -184,7 +190,8 @@ public class HLogKey implements WritableComparable { */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, final long now, List clusterIds, long nonceGroup, long nonce) { - init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce); + init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce); } /** @@ -195,13 +202,14 @@ public class HLogKey implements WritableComparable { * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). * @param tablename + * @param logSeqNum * @param nonceGroup * @param nonce */ - public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup, - long nonce) { - init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, - EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce); + public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, + long nonceGroup, long nonce) { + init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(), + EMPTY_UUIDS, nonceGroup, nonce); } protected void init(final byte [] encodedRegionName, final TableName tablename, @@ -238,11 +246,30 @@ public class HLogKey implements WritableComparable { } /** - * Allow that the log sequence id to be set post-construction. + * Allow that the log sequence id to be set post-construction and release all waiters on assigned + * sequence number. * @param sequence */ void setLogSeqNum(final long sequence) { this.logSeqNum = sequence; + this.seqNumAssignedLatch.countDown(); + } + + /** + * Wait for sequence number is assigned & return the assigned value + * @return long the new assigned sequence number + * @throws InterruptedException + */ + public long getSequenceNumber() throws IOException { + try { + this.seqNumAssignedLatch.await(); + } catch (InterruptedException ie) { + LOG.warn("Thread interrupted waiting for next log sequence number"); + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + return this.logSeqNum; } /** @@ -358,7 +385,7 @@ public class HLogKey implements WritableComparable { if (result == 0) { if (this.logSeqNum < o.logSeqNum) { result = -1; - } else if (this.logSeqNum > o.logSeqNum ) { + } else if (this.logSeqNum > o.logSeqNum) { result = 1; } if (result == 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 00e9f15..b9f82b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -1972,8 +1972,8 @@ public class HLogSplitter { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey - .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(), - clusterIds, walKey.getNonceGroup(), walKey.getNonce()); + .getTableName().toByteArray()), walKey.getLogSequenceNumber(), + walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce()); logEntry.setFirst(key); logEntry.setSecond(val); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index fcb5610..6809aad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -262,7 +262,7 @@ public class HLogUtil { final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { TableName tn = TableName.valueOf(c.getTableName().toByteArray()); HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); - log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false); + log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null); log.sync(); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index b10c4a9..8ecb4b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -249,7 +249,7 @@ public class WALEdit implements Writable, HeapSize { sb.append(">]"); return sb.toString(); } - + /** * Create a compacion WALEdit * @param c http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 98563d6..2e74281 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -68,7 +68,7 @@ public class TestMultiParallel { private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); - private static final int slaves = 2; // also used for testing HTable pool size + private static final int slaves = 3; // also used for testing HTable pool size @BeforeClass public static void beforeClass() throws Exception { ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); @@ -238,7 +238,7 @@ public class TestMultiParallel { * * @throws Exception */ - @Test (timeout=300000) + @Test (timeout=360000) public void testFlushCommitsWithAbort() throws Exception { LOG.info("test=testFlushCommitsWithAbort"); doTestFlushCommits(true); http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index f2db498..ebe95b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -61,6 +62,7 @@ public class TestDefaultMemStore extends TestCase { private static final int QUALIFIER_COUNT = ROW_COUNT; private static final byte [] FAMILY = Bytes.toBytes("column"); private MultiVersionConsistencyControl mvcc; + private AtomicLong startSeqNum = new AtomicLong(0); @Override public void setUp() throws Exception { @@ -236,7 +238,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v = Bytes.toBytes("value"); MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv1 = new KeyValue(row, f, q1, v); kv1.setMvccVersion(w.getWriteNumber()); @@ -250,7 +252,7 @@ public class TestDefaultMemStore extends TestCase { s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0); assertScannerResults(s, new KeyValue[]{kv1}); - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv2 = new KeyValue(row, f, q2, v); kv2.setMvccVersion(w.getWriteNumber()); memstore.add(kv2); @@ -280,7 +282,7 @@ public class TestDefaultMemStore extends TestCase { // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -296,7 +298,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START INSERT 2: Write both columns val2 - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv21 = new KeyValue(row, f, q1, v2); kv21.setMvccVersion(w.getWriteNumber()); memstore.add(kv21); @@ -332,7 +334,7 @@ public class TestDefaultMemStore extends TestCase { final byte[] v1 = Bytes.toBytes("value1"); // INSERT 1: Write both columns val1 MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kv11 = new KeyValue(row, f, q1, v1); kv11.setMvccVersion(w.getWriteNumber()); @@ -348,7 +350,7 @@ public class TestDefaultMemStore extends TestCase { assertScannerResults(s, new KeyValue[]{kv11, kv12}); // START DELETE: Insert delete for one of the columns - w = mvcc.beginMemstoreInsert(); + w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(), KeyValue.Type.DeleteColumn); kvDel.setMvccVersion(w.getWriteNumber()); @@ -377,6 +379,7 @@ public class TestDefaultMemStore extends TestCase { final MultiVersionConsistencyControl mvcc; final MemStore memstore; + final AtomicLong startSeqNum; AtomicReference caughtException; @@ -384,12 +387,14 @@ public class TestDefaultMemStore extends TestCase { public ReadOwnWritesTester(int id, MemStore memstore, MultiVersionConsistencyControl mvcc, - AtomicReference caughtException) + AtomicReference caughtException, + AtomicLong startSeqNum) { this.mvcc = mvcc; this.memstore = memstore; this.caughtException = caughtException; row = Bytes.toBytes(id); + this.startSeqNum = startSeqNum; } public void run() { @@ -403,7 +408,7 @@ public class TestDefaultMemStore extends TestCase { private void internalRun() throws IOException { for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { MultiVersionConsistencyControl.WriteEntry w = - mvcc.beginMemstoreInsert(); + mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet()); // Insert the sequence value (i) byte[] v = Bytes.toBytes(i); @@ -433,7 +438,7 @@ public class TestDefaultMemStore extends TestCase { AtomicReference caught = new AtomicReference(); for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught); + threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum); threads[i].start(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b2b4845..0f55b62 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -4152,15 +4152,16 @@ public class TestHRegion { durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); // expect skip wal cases - durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false); } + @SuppressWarnings("unchecked") private void durabilityTest(String method, Durability tableDurability, Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception { @@ -4183,7 +4184,7 @@ public class TestHRegion { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean()); + (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List)any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -4202,7 +4203,7 @@ public class TestHRegion { } }); } else { - verify(log, never()).sync(anyLong()); + //verify(log, never()).sync(anyLong()); verify(log, never()).sync(); }