hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [4/5] hbase git commit: HBASE-14465 Backport 'Allow rowlock to be reader/write' to branch-1
Date Wed, 30 Sep 2015 18:49:00 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/4812d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index eab754a..b526172 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -45,7 +45,6 @@ import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -60,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.RandomStringUtils;
@@ -76,7 +76,6 @@ import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -143,7 +142,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -201,13 +199,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private static final Log LOG = LogFactory.getLog(HRegion.class);
 
   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
-      "hbase.hregion.scan.loadColumnFamiliesOnDemand";
+    "hbase.hregion.scan.loadColumnFamiliesOnDemand";
 
   /**
    * Longest time we'll wait on a sequenceid.
    * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
-   * it without cleanup previous usage properly; generally, a WAL roll is needed.
-   * Key to use changing the default of 30000ms.
+   * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout
+   * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally
+   * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting
+   * on the WALKey latch. Revisit.
    */
   private final int maxWaitForSeqId;
   private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
@@ -220,6 +220,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
 
   final AtomicBoolean closed = new AtomicBoolean(false);
+
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
    * master as a region to close if the carrying regionserver is overloaded.
@@ -239,19 +240,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
    */
   private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
-  /**
-   * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
-   * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
-   * Its default value is -1L. This default is used as a marker to indicate
-   * that the region hasn't opened yet. Once it is opened, it is set to the derived
-   * #openSeqNum, the largest sequence id of all hfiles opened under this Region.
-   *
-   * <p>Control of this sequence is handed off to the WAL implementation.  It is responsible
-   * for tagging edits with the correct sequence id since it is responsible for getting the
-   * edits into the WAL files. It controls updating the sequence id value.  DO NOT UPDATE IT
-   * OUTSIDE OF THE WAL.  The value you get will not be what you think it is.
-   */
-  private final AtomicLong sequenceId = new AtomicLong(-1L);
 
   /**
    * The sequence id of the last replayed open region event from the primary region. This is used
@@ -279,7 +267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
 
-  public final AtomicLong memstoreSize = new AtomicLong(0);
+  private final AtomicLong memstoreSize = new AtomicLong(0);
 
   // Debug possible data loss due to WAL off
   final Counter numMutationsWithoutWAL = new Counter();
@@ -370,7 +358,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   /**
    * @return The smallest mvcc readPoint across all the scanners in this
-   * region. Writes older than this readPoint, are included  in every
+   * region. Writes older than this readPoint, are included in every
    * read operation.
    */
   public long getSmallestReadPoint() {
@@ -379,7 +367,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // no new RegionScanners can grab a readPoint that we are unaware of.
     // We achieve this by synchronizing on the scannerReadPoints object.
     synchronized(scannerReadPoints) {
-      minimumReadPoint = mvcc.memstoreReadPoint();
+      minimumReadPoint = mvcc.getReadPoint();
 
       for (Long readPoint: this.scannerReadPoints.values()) {
         if (readPoint < minimumReadPoint) {
@@ -593,8 +581,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private boolean splitRequest;
   private byte[] explicitSplitPoint = null;
 
-  private final MultiVersionConcurrencyControl mvcc =
-      new MultiVersionConcurrencyControl();
+  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
 
   // Coprocessor host
   private RegionCoprocessorHost coprocessorHost;
@@ -630,6 +617,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @deprecated Use other constructors.
    */
   @Deprecated
+  @VisibleForTesting
   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
       final Configuration confParam, final HRegionInfo regionInfo,
       final HTableDescriptor htd, final RegionServerServices rsServices) {
@@ -821,7 +809,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     // Initialize all the HStores
     status.setStatus("Initializing all the Stores");
-    long maxSeqId = initializeRegionStores(reporter, status, false);
+    long maxSeqId = initializeStores(reporter, status);
+    this.mvcc.advanceTo(maxSeqId);
+    if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
+      // Recover any edits if available.
+      maxSeqId = Math.max(maxSeqId,
+        replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+      // Make sure mvcc is up to max.
+      this.mvcc.advanceTo(maxSeqId);
+    }
     this.lastReplayedOpenRegionSeqId = maxSeqId;
 
     this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
@@ -884,10 +880,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return nextSeqid;
   }
 
-  private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
-      boolean warmupOnly)
-      throws IOException {
-
+  /**
+   * Open all Stores.
+   * @param reporter
+   * @param status
+   * @return Highest sequenceId found out in a Store.
+   * @throws IOException
+   */
+  private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
+  throws IOException {
     // Load in all the HStores.
 
     long maxSeqId = -1;
@@ -949,14 +950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
     }
-    if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
-      // Recover any edits if available.
-      maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
-          this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
-    }
-    maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
-    mvcc.initialize(maxSeqId);
-    return maxSeqId;
+    return Math.max(maxSeqId, maxMemstoreTS + 1);
   }
 
   private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
@@ -964,7 +958,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     // Initialize all the HStores
     status.setStatus("Warming up all the Stores");
-    initializeRegionStores(reporter, status, true);
+    initializeStores(reporter, status);
   }
 
   private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
@@ -980,8 +974,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
       getRegionServerServices().getServerName(), storeFiles);
-    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
-      getSequenceId());
+    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
   }
 
   private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -995,17 +988,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
-      RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
+      RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
-    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
-      getSequenceId());
+    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
 
     // Store SeqId in HDFS when a region closes
     // checking region folder exists is due to many tests which delete the table folder while a
     // table is still online
     if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
       WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
-        getSequenceId().get(), 0);
+        mvcc.getReadPoint(), 0);
     }
   }
 
@@ -1277,7 +1269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
        // This scan can read even uncommitted transactions
        return Long.MAX_VALUE;
      }
-     return mvcc.memstoreReadPoint();
+     return mvcc.getReadPoint();
    }
 
    @Override
@@ -1456,11 +1448,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         for (final Store store : stores.values()) {
           long flushableSize = store.getFlushableSize();
           if (!(abort || flushableSize == 0 || writestate.readOnly)) {
-            getRegionServerServices().abort("Assertion failed while closing store "
+            if (getRegionServerServices() != null) {
+              getRegionServerServices().abort("Assertion failed while closing store "
                 + getRegionInfo().getRegionNameAsString() + " " + store
                 + ". flushableSize expected=0, actual= " + flushableSize
                 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
                 + "operation failed and left the memstore in a partially updated state.", null);
+            }
           }
           completionService
               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
@@ -1957,11 +1951,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   boolean shouldFlushStore(Store store) {
     long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
       store.getFamily().getName()) - 1;
-    if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
+    if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
           getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
-          " is > " + this.flushPerChanges + " from current=" + sequenceId.get());
+          " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
       }
       return true;
     }
@@ -1987,7 +1981,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     whyFlush.setLength(0);
     // This is a rough measure.
     if (this.maxFlushedSeqId > 0
-          && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
+          && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
       whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
       return true;
     }
@@ -2077,11 +2071,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  protected PrepareFlushResult internalPrepareFlushCache(
-      final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
-      MonitoredTask status, boolean writeFlushWalMarker)
-          throws IOException {
-
+  protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
+      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+  throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -2091,7 +2083,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (this.memstoreSize.get() <= 0) {
       // Take an update lock because am about to change the sequence id and we want the sequence id
       // to be at the border of the empty memstore.
-      MultiVersionConcurrencyControl.WriteEntry w = null;
+      MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
       this.updatesLock.writeLock().lock();
       try {
         if (this.memstoreSize.get() <= 0) {
@@ -2099,29 +2091,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // this region out in the WAL subsystem so no need to do any trickery clearing out
           // edits in the WAL system. Up the sequence number so the resulting flush id is for
           // sure just beyond the last appended region edit (useful as a marker when bulk loading,
-          // etc.)
-          // wal can be null replaying edits.
+          // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing
+          // here.
           if (wal != null) {
-            w = mvcc.beginMemstoreInsert();
-            long flushOpSeqId = getNextSequenceId(wal);
+            writeEntry = mvcc.begin();
+            long flushOpSeqId = writeEntry.getWriteNumber();
             FlushResult flushResult = new FlushResultImpl(
-              FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
-              writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
-            w.setWriteNumber(flushOpSeqId);
-            mvcc.waitForPreviousTransactionsComplete(w);
-            w = null;
+                FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+                flushOpSeqId,
+                "Nothing to flush",
+                writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
+            // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader
+            // with a read point is in advance of this write point.
+            mvcc.completeAndWait(writeEntry);
+            writeEntry = null;
             return new PrepareFlushResult(flushResult, myseqid);
           } else {
             return new PrepareFlushResult(
-              new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-                "Nothing to flush", false),
+              new FlushResultImpl(
+                  FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+                  "Nothing to flush",
+                  false),
               myseqid);
           }
         }
       } finally {
         this.updatesLock.writeLock().unlock();
-        if (w != null) {
-          mvcc.advanceMemstore(w);
+        if (writeEntry != null) {
+          mvcc.complete(writeEntry);
         }
       }
     }
@@ -2132,10 +2129,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       if (!isAllFamilies(storesToFlush)) {
         perCfExtras = new StringBuilder();
         for (Store store: storesToFlush) {
-          perCfExtras.append("; ");
-          perCfExtras.append(store.getColumnFamilyName());
-          perCfExtras.append("=");
-          perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
+          perCfExtras.append("; ").append(store.getColumnFamilyName());
+          perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
         }
       }
       LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
@@ -2147,7 +2142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
     // allow updates again so its value will represent the size of the updates received
     // during flush
-    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+
     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
     // and memstore (makes it difficult to do atomic rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
@@ -2178,9 +2173,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
 
     long trxId = 0;
+    MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
     try {
       try {
-        writeEntry = mvcc.beginMemstoreInsert();
         if (wal != null) {
           Long earliestUnflushedSequenceIdForTheRegion =
             wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
@@ -2215,7 +2210,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             getRegionInfo(), flushOpSeqId, committedFiles);
           // no sync. Sync is below where we do not hold the updates lock
           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, sequenceId, false);
+            desc, false, mvcc);
         }
 
         // Prepare flush (take a snapshot)
@@ -2229,7 +2224,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
                 getRegionInfo(), flushOpSeqId, committedFiles);
               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-                desc, sequenceId, false);
+                desc, false, mvcc);
             } catch (Throwable t) {
               LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
                   StringUtils.stringifyException(t));
@@ -2263,18 +2258,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // uncommitted transactions from being written into HFiles.
       // We have to block before we start the flush, otherwise keys that
       // were removed via a rollbackMemstore could be written to Hfiles.
-      writeEntry.setWriteNumber(flushOpSeqId);
-      mvcc.waitForPreviousTransactionsComplete(writeEntry);
-      // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
+      mvcc.completeAndWait(writeEntry);
+      // set writeEntry to null to prevent mvcc.complete from being called again inside finally
+      // block
       writeEntry = null;
     } finally {
       if (writeEntry != null) {
-        // in case of failure just mark current writeEntry as complete
-        mvcc.advanceMemstore(writeEntry);
+        // In case of failure just mark current writeEntry as complete.
+        mvcc.complete(writeEntry);
       }
     }
-    return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
-      flushedSeqId, totalFlushableSizeOfFlushableStores);
+    return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
+        flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
   }
 
   /**
@@ -2294,10 +2289,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
     if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
-        getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
+        getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
       try {
         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-          desc, sequenceId, true);
+          desc, true, mvcc);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2366,7 +2361,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-          desc, sequenceId, true);
+          desc, true, mvcc);
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -2380,11 +2375,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, sequenceId, false);
+            desc, false, mvcc);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
-              + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
-              + StringUtils.stringifyException(ex));
+              + "failed writing ABORT_FLUSH marker to WAL", ex);
           // ignore this since we will be aborting the RS with DSE.
         }
         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
@@ -2458,7 +2452,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
     // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
     // so if an abort or stop, there is no way to call them in.
-    WALKey key = this.appendEmptyEdit(wal, null);
+    WALKey key = this.appendEmptyEdit(wal);
+    mvcc.complete(key.getWriteEntry());
     return key.getSequenceId(this.maxWaitForSeqId);
   }
 
@@ -2914,7 +2909,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     // reference family maps directly so coprocessors can mutate them if desired
     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
-    List<Cell> memstoreCells = new ArrayList<Cell>();
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
@@ -2979,17 +2973,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
         // If we haven't got any rows in our batch, we should block to
         // get the next one.
-        boolean shouldBlock = numReadyToWrite == 0;
         RowLock rowLock = null;
         try {
-          rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
+          rowLock = getRowLock(mutation.getRow(), true);
         } catch (IOException ioe) {
           LOG.warn("Failed getting lock in batch put, row="
             + Bytes.toStringBinary(mutation.getRow()), ioe);
         }
         if (rowLock == null) {
           // We failed to grab another lock
-          assert !shouldBlock : "Should never fail to get lock when blocking";
+          assert false: "Should never fail to get lock when blocking";
           break; // stop acquiring more rows for this batch
         } else {
           acquiredRowLocks.add(rowLock);
@@ -3049,16 +3042,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
       locked = true;
-      if(isInReplay) {
-        mvccNum = batchOp.getReplaySequenceId();
-      } else {
-        mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
-      }
-      //
-      // ------------------------------------
-      // Acquire the latest mvcc number
-      // ----------------------------------
-      writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
 
       // calling the pre CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
@@ -3069,35 +3052,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       // ------------------------------------
-      // STEP 3. Write back to memstore
-      // Write to memstore. It is ok to write to memstore
-      // first without updating the WAL because we do not roll
-      // forward the memstore MVCC. The MVCC will be moved up when
-      // the complete operation is done. These changes are not yet
-      // visible to scanners till we update the MVCC. The MVCC is
-      // moved only when the sync is complete.
-      // ----------------------------------
-      long addedSize = 0;
-      for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
-          continue;
-        }
-        doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
-      }
-
-      // ------------------------------------
-      // STEP 4. Build WAL edit
+      // STEP 3. Build WAL edit
       // ----------------------------------
       Durability durability = Durability.USE_DEFAULT;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
-        if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
+        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
           continue;
         }
-        batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
 
         Mutation m = batchOp.getMutation(i);
         Durability tmpDur = getEffectiveDurability(m.getDurability());
@@ -3123,9 +3085,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
             walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
               this.htableDescriptor.getTableName(), now, m.getClusterIds(),
-              currentNonceGroup, currentNonce);
+              currentNonceGroup, currentNonce, mvcc);
             txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
-              walEdit, getSequenceId(), true, null);
+              walEdit, true);
             walEdit = new WALEdit(isInReplay);
             walKey = null;
           }
@@ -3144,38 +3106,57 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       // -------------------------
-      // STEP 5. Append the final edit to WAL. Do not sync wal.
+      // STEP 4. Append the final edit to WAL. Do not sync wal.
       // -------------------------
       Mutation mutation = batchOp.getMutation(firstIndex);
       if (isInReplay) {
         // use wal key from the original
         walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
           this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-          mutation.getClusterIds(), currentNonceGroup, currentNonce);
+          mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
         long replaySeqId = batchOp.getReplaySequenceId();
         walKey.setOrigLogSeqNum(replaySeqId);
-
-        // ensure that the sequence id of the region is at least as big as orig log seq id
-        while (true) {
-          long seqId = getSequenceId().get();
-          if (seqId >= replaySeqId) break;
-          if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
-        }
       }
       if (walEdit.size() > 0) {
         if (!isInReplay) {
         // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
         walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-            mutation.getClusterIds(), currentNonceGroup, currentNonce);
+            mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
         }
-
-        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
-          getSequenceId(), true, memstoreCells);
+        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
       }
-      if (walKey == null){
-        // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
-        walKey = this.appendEmptyEdit(this.wal, memstoreCells);
+      // ------------------------------------
+      // Acquire the latest mvcc number
+      // ----------------------------------
+      if (walKey == null) {
+        // If this is a skip wal operation just get the read point from mvcc
+        walKey = this.appendEmptyEdit(this.wal);
+      }
+      if (!isInReplay) {
+        writeEntry = walKey.getWriteEntry();
+        mvccNum = writeEntry.getWriteNumber();
+      } else {
+        mvccNum = batchOp.getReplaySequenceId();
+      }
+
+      // ------------------------------------
+      // STEP 5. Write back to memstore
+      // Write to memstore. It is ok to write to memstore
+      // first without syncing the WAL because we do not roll
+      // forward the memstore MVCC. The MVCC will be moved up when
+      // the complete operation is done. These changes are not yet
+      // visible to scanners till we update the MVCC. The MVCC is
+      // moved only when the sync is complete.
+      // ----------------------------------
+      long addedSize = 0;
+      for (int i = firstIndex; i < lastIndexExclusive; i++) {
+        if (batchOp.retCodeDetails[i].getOperationStatusCode()
+            != OperationStatusCode.NOT_RUN) {
+          continue;
+        }
+        doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
       }
 
       // -------------------------------
@@ -3203,13 +3184,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
 
-
       // ------------------------------------------------------------------
       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
       // ------------------------------------------------------------------
       if (writeEntry != null) {
-        mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+        mvcc.completeAndWait(writeEntry);
         writeEntry = null;
+      } else if (isInReplay) {
+        // ensure that the sequence id of the region is at least as big as orig log seq id
+        mvcc.advanceTo(mvccNum);
+      }
+
+      for (int i = firstIndex; i < lastIndexExclusive; i ++) {
+        if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
+          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+        }
       }
 
       // ------------------------------------
@@ -3237,10 +3226,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     } finally {
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
-        rollbackMemstore(memstoreCells);
-        if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+        for (int j = 0; j < familyMaps.length; j++) {
+          for(List<Cell> cells:familyMaps[j].values()) {
+            rollbackMemstore(cells);
+          }
+        }
+        if (writeEntry != null) mvcc.complete(writeEntry);
       } else if (writeEntry != null) {
-        mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+        mvcc.completeAndWait(writeEntry);
       }
 
       if (locked) {
@@ -3327,7 +3320,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // Lock row - note that doBatchMutate will relock this row if called
       RowLock rowLock = getRowLock(get.getRow());
       // wait for all previous transactions to complete (with lock held)
-      mvcc.waitForPreviousTransactionsComplete();
+      mvcc.await();
       try {
         if (this.getCoprocessorHost() != null) {
           Boolean processed = null;
@@ -3437,7 +3430,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // Lock row - note that doBatchMutate will relock this row if called
       RowLock rowLock = getRowLock(get.getRow());
       // wait for all previous transactions to complete (with lock held)
-      mvcc.waitForPreviousTransactionsComplete();
+      mvcc.await();
       try {
         List<Cell> result = get(get, false);
 
@@ -3515,7 +3508,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private void doBatchMutate(Mutation mutation) throws IOException {
     // Currently this is only called for puts and deletes, so no nonces.
-    OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
+    OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
@@ -3691,7 +3684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @throws IOException
    */
   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
+    long mvccNum, boolean isInReplay) throws IOException {
     long size = 0;
 
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3702,10 +3695,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       int listSize = cells.size();
       for (int i=0; i < listSize; i++) {
         Cell cell = cells.get(i);
-        CellUtil.setSequenceId(cell, mvccNum);
+        if (cell.getSequenceId() == 0) {
+          CellUtil.setSequenceId(cell, mvccNum);
+        }
         Pair<Long, Cell> ret = store.add(cell);
         size += ret.getFirst();
-        memstoreCells.add(ret.getSecond());
         if(isInReplay) {
           // set memstore newly added cells with replay mvcc number
           CellUtil.setSequenceId(ret.getSecond(), mvccNum);
@@ -4462,12 +4456,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         this.maxFlushedSeqId = flush.getFlushSequenceNumber();
 
         // advance the mvcc read point so that the new flushed file is visible.
-        // there may be some in-flight transactions, but they won't be made visible since they are
-        // either greater than flush seq number or they were already dropped via flush.
-        // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
-        // stores while they are still in flight because the flush commit marker will not contain
-        // flushes from ALL stores.
-        getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+        mvcc.advanceTo(flush.getFlushSequenceNumber());
 
       } catch (FileNotFoundException ex) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -4534,15 +4523,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Drops the memstore contents after replaying a flush descriptor or region open event replay
    * if the memstore edits have seqNums smaller than the given seq id
-   * @param flush the flush descriptor
    * @throws IOException
    */
   private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
     long totalFreedSize = 0;
     this.updatesLock.writeLock().lock();
     try {
-      mvcc.waitForPreviousTransactionsComplete();
-      long currentSeqId = getSequenceId().get();
+
+      long currentSeqId = mvcc.getReadPoint();
       if (seqId >= currentSeqId) {
         // then we can drop the memstore contents since everything is below this seqId
         LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4705,9 +4693,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         dropPrepareFlushIfPossible();
 
         // advance the mvcc read point so that the new flushed file is visible.
-        // there may be some in-flight transactions, but they won't be made visible since they are
-        // either greater than flush seq number or they were already dropped via flush.
-        getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+        mvcc.await();
 
         // If we were waiting for observing a flush or region opening event for not showing partial
         // data after a secondary region crash, we can allow reads now.
@@ -4798,7 +4784,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
       if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
-        getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
+        mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
       }
     } finally {
       closeBulkRegionOperation();
@@ -4897,11 +4883,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         dropPrepareFlushIfPossible();
 
         // advance the mvcc read point so that the new flushed files are visible.
-        // there may be some in-flight transactions, but they won't be made visible since they are
-        // either greater than flush seq number or they were already picked up via flush.
-        for (Store s : getStores()) {
-          getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
-        }
+          // either greater than flush seq number or they were already picked up via flush.
+          for (Store s : getStores()) {
+            mvcc.advanceTo(s.getMaxMemstoreTS());
+          }
+
 
         // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
         // skip all edits that are to be replayed in the future with that has a smaller seqId
@@ -5050,75 +5036,91 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  @Override
-  public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
-    startRegionOperation();
-    try {
-      return getRowLockInternal(row, waitForLock);
-    } finally {
-      closeRegionOperation();
-    }
+
+  /**
+   * Get an exclusive ( write lock ) lock on a given row.
+   * @param row Which row to lock.
+   * @return A locked RowLock. The lock is exclusive and already aqquired.
+   * @throws IOException
+   */
+  public RowLock getRowLock(byte[] row) throws IOException {
+    return getRowLock(row, false);
   }
 
   /**
-   * A version of getRowLock(byte[], boolean) to use when a region operation has already been
+   *
+   * Get a row lock for the specified row. All locks are reentrant.
+   *
+   * Before calling this function make sure that a region operation has already been
    * started (the calling thread has already acquired the region-close-guard lock).
+   * @param row The row actions will be performed against
+   * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+   *                 lock is requested
    */
-  protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
+  public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
+    // Make sure the row is inside of this region before getting the lock for it.
+    checkRow(row, "row lock");
+    // create an object to use a a key in the row lock map
     HashedBytes rowKey = new HashedBytes(row);
-    RowLockContext rowLockContext = new RowLockContext(rowKey);
 
-    // loop until we acquire the row lock (unless !waitForLock)
-    while (true) {
-      RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
-      if (existingContext == null) {
-        // Row is not already locked by any thread, use newly created context.
-        break;
-      } else if (existingContext.ownedByCurrentThread()) {
-        // Row is already locked by current thread, reuse existing context instead.
-        rowLockContext = existingContext;
-        break;
-      } else {
-        if (!waitForLock) {
-          return null;
+    RowLockContext rowLockContext = null;
+    RowLockImpl result = null;
+    TraceScope traceScope = null;
+
+    // If we're tracing start a span to show how long this took.
+    if (Trace.isTracing()) {
+      traceScope = Trace.startSpan("HRegion.getRowLock");
+      traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
+    }
+
+    try {
+      // Keep trying until we have a lock or error out.
+      // TODO: do we need to add a time component here?
+      while (result == null) {
+
+        // Try adding a RowLockContext to the lockedRows.
+        // If we can add it then there's no other transactions currently running.
+        rowLockContext = new RowLockContext(rowKey);
+        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+
+        // if there was a running transaction then there's already a context.
+        if (existingContext != null) {
+          rowLockContext = existingContext;
         }
-        TraceScope traceScope = null;
-        try {
-          if (Trace.isTracing()) {
-            traceScope = Trace.startSpan("HRegion.getRowLockInternal");
-          }
-          // Row is already locked by some other thread, give up or wait for it
-          if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
-            if(traceScope != null) {
-              traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
-            }
-            throw new IOException("Timed out waiting for lock for row: " + rowKey);
-          }
-          if (traceScope != null) traceScope.close();
-          traceScope = null;
-        } catch (InterruptedException ie) {
-          LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
-          InterruptedIOException iie = new InterruptedIOException();
-          iie.initCause(ie);
-          throw iie;
-        } finally {
-          if (traceScope != null) traceScope.close();
+
+        // Now try an get the lock.
+        //
+        // This can fail as
+        if (readLock) {
+          result = rowLockContext.newReadLock();
+        } else {
+          result = rowLockContext.newWriteLock();
+        }
+      }
+      if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
+        if (traceScope != null) {
+          traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
         }
+        result = null;
+        // Clean up the counts just in case this was the thing keeping the context alive.
+        rowLockContext.cleanUp();
+        throw new IOException("Timed out waiting for lock for row: " + rowKey);
+      }
+      return result;
+    } catch (InterruptedException ie) {
+      LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      if (traceScope != null) {
+        traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
+      }
+      Thread.currentThread().interrupt();
+      throw iie;
+    } finally {
+      if (traceScope != null) {
+        traceScope.close();
       }
     }
-
-    // allocate new lock for this thread
-    return rowLockContext.newLock();
-  }
-
-  /**
-   * Acquires a lock on the given row.
-   * The same thread may acquire multiple locks on the same row.
-   * @return the acquired row lock
-   * @throws IOException if the lock could not be acquired after waiting
-   */
-  public RowLock getRowLock(byte[] row) throws IOException {
-    return getRowLock(row, true);
   }
 
   @Override
@@ -5131,6 +5133,97 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  @VisibleForTesting
+  class RowLockContext {
+    private final HashedBytes row;
+    final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+    final AtomicBoolean usable = new AtomicBoolean(true);
+    final AtomicInteger count = new AtomicInteger(0);
+    final Object lock = new Object();
+
+    RowLockContext(HashedBytes row) {
+      this.row = row;
+    }
+
+    RowLockImpl newWriteLock() {
+      Lock l = readWriteLock.writeLock();
+      return getRowLock(l);
+    }
+    RowLockImpl newReadLock() {
+      Lock l = readWriteLock.readLock();
+      return getRowLock(l);
+    }
+
+    private RowLockImpl getRowLock(Lock l) {
+      count.incrementAndGet();
+      synchronized (lock) {
+        if (usable.get()) {
+          return new RowLockImpl(this, l);
+        } else {
+          return null;
+        }
+      }
+    }
+
+    void cleanUp() {
+      long c = count.decrementAndGet();
+      if (c <= 0) {
+        synchronized (lock) {
+          if (count.get() <= 0 ){
+            usable.set(false);
+            RowLockContext removed = lockedRows.remove(row);
+            assert removed == this: "we should never remove a different context";
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "RowLockContext{" +
+          "row=" + row +
+          ", readWriteLock=" + readWriteLock +
+          ", count=" + count +
+          '}';
+    }
+  }
+
+  /**
+   * Class used to represent a lock on a row.
+   */
+  public static class RowLockImpl implements RowLock {
+    private final RowLockContext context;
+    private final Lock lock;
+
+    public RowLockImpl(RowLockContext context, Lock lock) {
+      this.context = context;
+      this.lock = lock;
+    }
+
+    public Lock getLock() {
+      return lock;
+    }
+
+    @VisibleForTesting
+    public RowLockContext getContext() {
+      return context;
+    }
+
+    @Override
+    public void release() {
+      lock.unlock();
+      context.cleanUp();
+    }
+
+    @Override
+    public String toString() {
+      return "RowLockImpl{" +
+          "context=" + context +
+          ", lock=" + lock +
+          '}';
+    }
+  }
+
   /**
    * Determines whether multiple column families are present
    * Precondition: familyPaths is not null
@@ -5276,7 +5369,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               this.getRegionInfo().getTable(),
               ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
           WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
-              loadDescriptor, sequenceId);
+              loadDescriptor, mvcc);
         } catch (IOException ioe) {
           if (this.rsServices != null) {
             // Have to abort region server because some hfiles has been loaded but we can't write
@@ -5438,7 +5531,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
+    public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
+    throws IOException {
       if (this.filterClosed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
             "after we renewed it. Could be caused by a very slow scanner " +
@@ -5854,7 +5948,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
         short length) throws IOException {
-      assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
+      assert this.joinedContinuationRow == null:
+        "Trying to go to next row during joinedHeap read.";
       Cell next;
       while ((next = this.storeHeap.peek()) != null &&
              CellUtil.matchingRow(next, currentRow, offset, length)) {
@@ -6103,11 +6198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveWAL, fs, conf, info, hTableDescriptor, null);
-    if (initialize) {
-      // If initializing, set the sequenceId. It is also required by WALPerformanceEvaluation when
-      // verifying the WALEdits.
-      region.setSequenceId(region.initialize(null));
-    }
+    if (initialize) region.initialize(null);
     return region;
   }
 
@@ -6320,7 +6411,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // Refuse to open the region if a required class cannot be loaded
     checkClassLoading();
     this.openSeqNum = initialize(reporter);
-    this.setSequenceId(openSeqNum);
+    this.mvcc.advanceTo(openSeqNum);
     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
         && !recovering) {
       // Only write the region open event marker to WAL if (1) we are not read-only
@@ -6746,7 +6837,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     List<RowLock> acquiredRowLocks;
     long addedSize = 0;
     List<Mutation> mutations = new ArrayList<Mutation>();
-    List<Cell> memstoreCells = new ArrayList<Cell>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
     long mvccNum = 0;
     WALKey walKey = null;
@@ -6755,13 +6845,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
       for (byte[] row : rowsToLock) {
         // Attempt to lock all involved rows, throw if any lock times out
+        // use a writer lock for mixed reads and writes
         acquiredRowLocks.add(getRowLock(row));
       }
       // 3. Region lock
       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
       locked = true;
-      // Get a mvcc write number
-      mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
 
       long now = EnvironmentEdgeManager.currentTime();
       try {
@@ -6771,11 +6860,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             processor, now, this, mutations, walEdit, timeout);
 
         if (!mutations.isEmpty()) {
-          // 5. Start mvcc transaction
-          writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
-          // 6. Call the preBatchMutate hook
+
+          // 5. Call the preBatchMutate hook
           processor.preBatchMutate(this, walEdit);
-          // 7. Apply to memstore
+
+          long txid = 0;
+          // 6. Append no sync
+          if (!walEdit.isEmpty()) {
+            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+              processor.getClusterIds(), nonceGroup, nonce, mvcc);
+            txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+                walKey, walEdit, false);
+          }
+          if(walKey == null){
+            // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+            // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
+            walKey = this.appendEmptyEdit(this.wal);
+          }
+
+          // 7. Start mvcc transaction
+          writeEntry = walKey.getWriteEntry();
+          mvccNum = walKey.getSequenceId();
+
+
+
+          // 8. Apply to memstore
           for (Mutation m : mutations) {
             // Handle any tag based cell features
             rewriteCellTags(m.getFamilyCellMap(), m);
@@ -6790,25 +6901,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               }
               Pair<Long, Cell> ret = store.add(cell);
               addedSize += ret.getFirst();
-              memstoreCells.add(ret.getSecond());
             }
           }
 
-          long txid = 0;
-          // 8. Append no sync
-          if (!walEdit.isEmpty()) {
-            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-              processor.getClusterIds(), nonceGroup, nonce);
-            txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
-              walKey, walEdit, getSequenceId(), true, memstoreCells);
-          }
-          if(walKey == null){
-            // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
-            // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
-            walKey = this.appendEmptyEdit(this.wal, memstoreCells);
-          }
           // 9. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
@@ -6841,13 +6936,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
           }
           if (writeEntry != null) {
-            mvcc.cancelMemstoreInsert(writeEntry);
+            mvcc.complete(writeEntry);
             writeEntry = null;
           }
         }
         // 13. Roll mvcc forward
         if (writeEntry != null) {
-          mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+          mvcc.completeAndWait(writeEntry);
         }
         if (locked) {
           this.updatesLock.readLock().unlock();
@@ -6918,6 +7013,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  /**
+   * @param cell
+   * @param tags
+   * @return The passed-in List<Tag> but with the tags from <code>cell</code> added.
+   */
+  private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
+    if (cell.getTagsLength() <= 0) return tags;
+    List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; 
+    Iterator<Tag> i =
+        CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+    while (i.hasNext()) newTags.add(i.next());
+    return newTags;
+  }
+
+  /**
+   * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.
+   * @param store
+   * @param row
+   * @param family
+   * @param tr
+   * @return Get result.
+   * @throws IOException
+   */
+  private List<Cell> doGet(final Store store, final byte [] row,
+      final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
+  throws IOException {
+    // Sort the cells so that they match the order that they
+    // appear in the Get results. Otherwise, we won't be able to
+    // find the existing values if the cells are not specified
+    // in order by the client since cells are in an array list.
+    Collections.sort(family.getValue(), store.getComparator());
+    // Get previous values for all columns in this family
+    Get get = new Get(row);
+    for (Cell cell : family.getValue()) {
+      get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
+    }
+    if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
+    return get(get, false);
+  }
+
   public Result append(Append append) throws IOException {
     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
@@ -6927,64 +7062,49 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // transactions, so all stores only go through one code path for puts.
 
   @Override
-  public Result append(Append append, long nonceGroup, long nonce) throws IOException {
-    byte[] row = append.getRow();
-    checkRow(row, "append");
+  public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
+    Operation op = Operation.APPEND;
+    byte[] row = mutate.getRow();
+    checkRow(row, op.toString());
     boolean flush = false;
-    Durability durability = getEffectiveDurability(append.getDurability());
+    Durability durability = getEffectiveDurability(mutate.getDurability());
     boolean writeToWAL = durability != Durability.SKIP_WAL;
     WALEdit walEdits = null;
-    List<Cell> allKVs = new ArrayList<Cell>(append.size());
+    List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
     long size = 0;
     long txid = 0;
-
     checkReadOnly();
     checkResources();
     // Lock row
-    startRegionOperation(Operation.APPEND);
+    startRegionOperation(op);
     this.writeRequestsCount.increment();
-    long mvccNum = 0;
-    WriteEntry writeEntry = null;
-    WALKey walKey = null;
     RowLock rowLock = null;
-    List<Cell> memstoreCells = new ArrayList<Cell>();
+    WALKey walKey = null;
+    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
     boolean doRollBackMemstore = false;
     try {
       rowLock = getRowLock(row);
+      assert rowLock != null;
       try {
         lock(this.updatesLock.readLock());
         try {
-          // wait for all prior MVCC transactions to finish - while we hold the row lock
-          // (so that we are guaranteed to see the latest state)
-          mvcc.waitForPreviousTransactionsComplete();
+          // Wait for all prior MVCC transactions to finish - while we hold the row lock
+          // (so that we are guaranteed to see the latest state when we do our Get)
+          mvcc.await();
           if (this.coprocessorHost != null) {
-            Result r = this.coprocessorHost.preAppendAfterRowLock(append);
-            if(r!= null) {
+            Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);
+            if (r!= null) {
               return r;
             }
           }
-          // now start my own transaction
-          mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
-          writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTime();
           // Process each family
-          for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
-
+          for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {
             Store store = stores.get(family.getKey());
             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
 
-            // Sort the cells so that they match the order that they
-            // appear in the Get results. Otherwise, we won't be able to
-            // find the existing values if the cells are not specified
-            // in order by the client since cells are in an array list.
-            Collections.sort(family.getValue(), store.getComparator());
-            // Get previous values for all columns in this family
-            Get get = new Get(row);
-            for (Cell cell : family.getValue()) {
-              get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
-            }
-            List<Cell> results = get(get, false);
+            List<Cell> results = doGet(store, row, family, null);
 
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the append value
@@ -7002,30 +7122,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 long ts = Math.max(now, oldCell.getTimestamp());
 
                 // Process cell tags
-                List<Tag> newTags = new ArrayList<Tag>();
-
                 // Make a union of the set of tags in the old and new KVs
-
-                if (oldCell.getTagsLength() > 0) {
-                  Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
-                    oldCell.getTagsOffset(), oldCell.getTagsLength());
-                  while (i.hasNext()) {
-                    newTags.add(i.next());
-                  }
-                }
-                if (cell.getTagsLength() > 0) {
-                  Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(),
-                    cell.getTagsOffset(), cell.getTagsLength());
-                  while (i.hasNext()) {
-                    newTags.add(i.next());
-                  }
-                }
+                List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());
+                newTags = carryForwardTags(cell, newTags);
 
                 // Cell TTL handling
 
-                if (append.getTTL() != Long.MAX_VALUE) {
+                if (mutate.getTTL() != Long.MAX_VALUE) {
                   // Add the new TTL tag
-                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
                 }
 
                 // Rebuild tags
@@ -7063,9 +7168,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
                 // Cell TTL handling
 
-                if (append.getTTL() != Long.MAX_VALUE) {
+                if (mutate.getTTL() != Long.MAX_VALUE) {
                   List<Tag> newTags = new ArrayList<Tag>(1);
-                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
                   // Add the new TTL tag
                   newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
                       cell.getRowLength(),
@@ -7081,11 +7186,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 }
               }
 
-              CellUtil.setSequenceId(newCell, mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
-                    append, oldCell, newCell);
+                    mutate, oldCell, newCell);
               }
               kvs.add(newCell);
 
@@ -7102,47 +7206,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             tempMemstore.put(store, kvs);
           }
 
-          //Actually write to Memstore now
-          for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
-            Store store = entry.getKey();
-            if (store.getFamily().getMaxVersions() == 1) {
-              // upsert if VERSIONS for this CF == 1
-              size += store.upsert(entry.getValue(), getSmallestReadPoint());
-              memstoreCells.addAll(entry.getValue());
+          // Actually write to WAL now
+          if (walEdits != null && !walEdits.isEmpty()) {
+            if (writeToWAL) {
+              // Using default cluster id, as this can only happen in the originating
+              // cluster. A slave cluster receives the final value (not the delta)
+              // as a Put.
+              // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+              walKey = new HLogKey(
+                  getRegionInfo().getEncodedNameAsBytes(),
+                  this.htableDescriptor.getTableName(),
+                  WALKey.NO_SEQUENCE_ID,
+                  nonceGroup,
+                  nonce,
+                  mvcc);
+              txid =
+                this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
             } else {
-              // otherwise keep older versions around
-              for (Cell cell: entry.getValue()) {
-                Pair<Long, Cell> ret = store.add(cell);
-                size += ret.getFirst();
-                memstoreCells.add(ret.getSecond());
-                doRollBackMemstore = true;
-              }
+              recordMutationWithoutWal(mutate.getFamilyCellMap());
             }
-            allKVs.addAll(entry.getValue());
-          }
-
-          // Actually write to WAL now
-          if (writeToWAL) {
-            // Using default cluster id, as this can only happen in the originating
-            // cluster. A slave cluster receives the final value (not the delta)
-            // as a Put.
-            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
-            txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
-              this.sequenceId, true, memstoreCells);
-          } else {
-            recordMutationWithoutWal(append.getFamilyCellMap());
           }
           if (walKey == null) {
             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
-            walKey = this.appendEmptyEdit(this.wal, memstoreCells);
+            walKey = this.appendEmptyEdit(this.wal);
+          }
+
+          // now start my own transaction
+          writeEntry = walKey.getWriteEntry();
+
+
+          // Actually write to Memstore now
+          if (!tempMemstore.isEmpty()) {
+            for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
+              Store store = entry.getKey();
+              if (store.getFamily().getMaxVersions() == 1) {
+                // upsert if VERSIONS for this CF == 1
+                // Is this right? It immediately becomes visible? St.Ack 20150907
+                size += store.upsert(entry.getValue(), getSmallestReadPoint());
+              } else {
+                // otherwise keep older versions around
+                for (Cell cell: entry.getValue()) {
+                  CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
+                  Pair<Long, Cell> ret = store.add(cell);
+                  size += ret.getFirst();
+                  doRollBackMemstore = true;
+                }
+              }
+              // We add to all KVs here whereas when doing increment, we do it
+              // earlier... why?
+              allKVs.addAll(entry.getValue());
+            }
+
+            size = this.addAndGetGlobalMemstoreSize(size);
+            flush = isFlushSize(size);
           }
-          size = this.addAndGetGlobalMemstoreSize(size);
-          flush = isFlushSize(size);
         } finally {
           this.updatesLock.readLock().unlock();
         }
+
       } finally {
         rowLock.release();
         rowLock = null;
@@ -7158,13 +7279,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
-        rollbackMemstore(memstoreCells);
-        if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+        rollbackMemstore(allKVs);
+        if (writeEntry != null) mvcc.complete(writeEntry);
       } else if (writeEntry != null) {
-        mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+        mvcc.completeAndWait(writeEntry);
       }
 
-      closeRegionOperation(Operation.APPEND);
+      closeRegionOperation(op);
     }
 
     if (this.metricsRegion != null) {
@@ -7176,8 +7297,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       requestFlush();
     }
 
-
-    return append.isReturnResults() ? Result.create(allKVs) : null;
+    return mutate.isReturnResults() ? Result.create(allKVs) : null;
   }
 
   public Result increment(Increment increment) throws IOException {
@@ -7188,89 +7308,73 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // We should refactor append and increment as local get-mutate-put
   // transactions, so all stores only go through one code path for puts.
 
+  // They are subtley different in quiet a few ways. This came out only
+  // after study. I am not sure that many of the differences are intentional.
+  // TODO: St.Ack 20150907 
+
   @Override
-  public Result increment(Increment increment, long nonceGroup, long nonce)
+  public Result increment(Increment mutation, long nonceGroup, long nonce)
   throws IOException {
-    byte [] row = increment.getRow();
-    checkRow(row, "increment");
-    TimeRange tr = increment.getTimeRange();
+    Operation op = Operation.INCREMENT;
+    byte [] row = mutation.getRow();
+    checkRow(row, op.toString());
     boolean flush = false;
-    Durability durability = getEffectiveDurability(increment.getDurability());
+    Durability durability = getEffectiveDurability(mutation.getDurability());
     boolean writeToWAL = durability != Durability.SKIP_WAL;
     WALEdit walEdits = null;
-    List<Cell> allKVs = new ArrayList<Cell>(increment.size());
+    List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
+    
     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
     long size = 0;
     long txid = 0;
-
     checkReadOnly();
     checkResources();
     // Lock row
-    startRegionOperation(Operation.INCREMENT);
+    startRegionOperation(op);
     this.writeRequestsCount.increment();
     RowLock rowLock = null;
-    WriteEntry writeEntry = null;
     WALKey walKey = null;
-    long mvccNum = 0;
-    List<Cell> memstoreCells = new ArrayList<Cell>();
+    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
     boolean doRollBackMemstore = false;
+    TimeRange tr = mutation.getTimeRange();
     try {
       rowLock = getRowLock(row);
+      assert rowLock != null;
       try {
         lock(this.updatesLock.readLock());
         try {
           // wait for all prior MVCC transactions to finish - while we hold the row lock
           // (so that we are guaranteed to see the latest state)
-          mvcc.waitForPreviousTransactionsComplete();
+          mvcc.await();
           if (this.coprocessorHost != null) {
-            Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
+            Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);
             if (r != null) {
               return r;
             }
           }
-          // now start my own transaction
-          mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
-          writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTime();
           // Process each family
-          for (Map.Entry<byte [], List<Cell>> family:
-              increment.getFamilyCellMap().entrySet()) {
-
+          for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
             Store store = stores.get(family.getKey());
             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
 
-            // Sort the cells so that they match the order that they
-            // appear in the Get results. Otherwise, we won't be able to
-            // find the existing values if the cells are not specified
-            // in order by the client since cells are in an array list.
-            Collections.sort(family.getValue(), store.getComparator());
-            // Get previous values for all columns in this family
-            Get get = new Get(row);
-            for (Cell cell: family.getValue()) {
-              get.addColumn(family.getKey(),  CellUtil.cloneQualifier(cell));
-            }
-            get.setTimeRange(tr.getMin(), tr.getMax());
-            List<Cell> results = get(get, false);
+            List<Cell> results = doGet(store, row, family, tr);
 
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the increment amount
+
+            // Avoid as much copying as possible. We may need to rewrite and
+            // consolidate tags. Bytes are only copied once.
+            // Would be nice if KeyValue had scatter/gather logic
             int idx = 0;
+            // HERE WE DIVERGE FROM APPEND
             List<Cell> edits = family.getValue();
             for (int i = 0; i < edits.size(); i++) {
               Cell cell = edits.get(i);
               long amount = Bytes.toLong(CellUtil.cloneValue(cell));
               boolean noWriteBack = (amount == 0);
-              List<Tag> newTags = new ArrayList<Tag>();
-
-              // Carry forward any tags that might have been added by a coprocessor
-              if (cell.getTagsLength() > 0) {
-                Iterator<Tag> itr = CellUtil.tagsIterator(cell.getTagsArray(),
-                  cell.getTagsOffset(), cell.getTagsLength());
-                while (itr.hasNext()) {
-                  newTags.add(itr.next());
-                }
-              }
+
+              List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());
 
               Cell c = null;
               long ts = now;
@@ -7285,15 +7389,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                       "Attempted to increment field that isn't 64 bits wide");
                 }
                 // Carry tags forward from previous version
-                if (c.getTagsLength() > 0) {
-                  Iterator<Tag> itr = CellUtil.tagsIterator(c.getTagsArray(),
-                    c.getTagsOffset(), c.getTagsLength());
-                  while (itr.hasNext()) {
-                    newTags.add(itr.next());
-                  }
-                }
-                if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
+                newTags = carryForwardTags(c, newTags);
+                if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {
                   idx++;
+                }
               }
 
               // Append new incremented KeyValue to list
@@ -7301,8 +7400,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               byte[] val = Bytes.toBytes(amount);
 
               // Add the TTL tag if the mutation carried one
-              if (increment.getTTL() != Long.MAX_VALUE) {
-                newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
+              if (mutation.getTTL() != Long.MAX_VALUE) {
+                newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));
               }
 
               Cell newKV = new KeyValue(row, 0, row.length,
@@ -7313,12 +7412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 val, 0, val.length,
                 newTags);
 
-              CellUtil.setSequenceId(newKV, mvccNum);
-
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = coprocessorHost.postMutationBeforeWAL(
-                    RegionObserver.MutationType.INCREMENT, increment, c, newKV);
+                    RegionObserver.MutationType.INCREMENT, mutation, c, newKV);
               }
               allKVs.add(newKV);
 
@@ -7341,20 +7438,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
           }
 
-          //Actually write to Memstore now
+          // Actually write to WAL now
+          if (walEdits != null && !walEdits.isEmpty()) {
+            if (writeToWAL) {
+              // Using default cluster id, as this can only happen in the originating
+              // cluster. A slave cluster receives the final value (not the delta)
+              // as a Put.
+              // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+              walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+                  this.htableDescriptor.getTableName(),
+                  WALKey.NO_SEQUENCE_ID,
+                  nonceGroup,
+                  nonce,
+                  mvcc);
+              txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
+                  walKey, walEdits, true);
+            } else {
+              recordMutationWithoutWal(mutation.getFamilyCellMap());
+            }
+          }
+          if (walKey == null) {
+            // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+            walKey = this.appendEmptyEdit(this.wal);
+          }
+
+          // now start my own transaction
+          writeEntry = walKey.getWriteEntry();
+
+          // Actually write to Memstore now
           if (!tempMemstore.isEmpty()) {
             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
               Store store = entry.getKey();
               if (store.getFamily().getMaxVersions() == 1) {
                 // upsert if VERSIONS for this CF == 1
+                // Is this right? It immediately becomes visible? St.Ack 20150907
                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
-                memstoreCells.addAll(entry.getValue());
               } else {
                 // otherwise keep older versions around
                 for (Cell cell : entry.getValue()) {
+                  CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
                   Pair<Long, Cell> ret = store.add(cell);
                   size += ret.getFirst();
-                  memstoreCells.add(ret.getSecond());
                   doRollBackMemstore = true;
                 }
               }
@@ -7362,26 +7486,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             size = this.addAndGetGlobalMemstoreSize(size);
             flush = isFlushSize(size);
           }
-
-          // Actually write to WAL now
-          if (walEdits != null && !walEdits.isEmpty()) {
-            if (writeToWAL) {
-              // Using default cluster id, as this can only happen in the originating
-              // cluster. A slave cluster receives the final value (not the delta)
-              // as a Put.
-              // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-              walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-                this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
-              txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
-                walKey, walEdits, getSequenceId(), true, memstoreCells);
-            } else {
-              recordMutationWithoutWal(increment.getFamilyCellMap());
-            }
-          }
-          if(walKey == null){
-            // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
-            walKey = this.appendEmptyEdit(this.wal, memstoreCells);
-          }
         } finally {
           this.updatesLock.readLock().unlock();
         }
@@ -7400,10 +7504,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
-        rollbackMemstore(memstoreCells);
-        if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
+        for(List<Cell> cells: tempMemstore.values()) {
+          rollbackMemstore(cells);
+        }
+        if (writeEntry != null) mvcc.complete(writeEntry);
       } else if (writeEntry != null) {
-        mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
+        mvcc.completeAndWait(writeEntry);
       }
       closeRegionOperation(Operation.INCREMENT);
       if (this.metricsRegion != null) {
@@ -7415,7 +7521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
     }
-    return increment.isReturnResults() ? Result.create(allKVs) : null;
+    return mutation.isReturnResults() ? Result.create(allKVs) : null;
   }
 
   //
@@ -7434,7 +7540,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       5 * Bytes.SIZEOF_BOOLEAN);
 
@@ -7579,7 +7685,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new IOException("Not a known catalog table: " + p.toString());
     }
     try {
-      region.initialize(null);
+      region.mvcc.advanceTo(region.initialize(null));
       if (majorCompact) {
         region.compact(true);
       } else {
@@ -7798,7 +7904,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Update counters for numer of puts without wal and the size of possible data loss.
+   * Update counters for number of puts without wal and the size of possible data loss.
    * These information are exposed by the region server metrics.
    */
   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
@@ -7997,103 +8103,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Do not change this sequence id. See {@link #sequenceId} comment.
+   * Do not change this sequence id.
    * @return sequenceId
    */
   @VisibleForTesting
-  public AtomicLong getSequenceId() {
-    return this.sequenceId;
-  }
-
-  /**
-   * sets this region's sequenceId.
-   * @param value new value
-   */
-  private void setSequenceId(long value) {
-    this.sequenceId.set(value);
-  }
-
-  @VisibleForTesting class RowLockContext {
-    private final HashedBytes row;
-    private final CountDownLatch latch = new CountDownLatch(1);
-    private final Thread thread;
-    private int lockCount = 0;
-
-    RowLockContext(HashedBytes row) {
-      this.row = row;
-      this.thread = Thread.currentThread();
-    }
-
-    boolean ownedByCurrentThread() {
-      return thread == Thread.currentThread();
-    }
-
-    RowLock newLock() {
-      lockCount++;
-      RowLockImpl rl = new RowLockImpl();
-      rl.setContext(this);
-      return rl;
-    }
-
-    void releaseLock() {
-      if (!ownedByCurrentThread()) {
-        throw new IllegalArgumentException("Lock held by thread: " + thread
-          + " cannot be released by different thread: " + Thread.currentThread());
-      }
-      lockCount--;
-      if (lockCount == 0) {
-        // no remaining locks by the thread, unlock and allow other threads to access
-        RowLockContext existingContext = lockedRows.remove(row);
-        if (existingContext != this) {
-          throw new RuntimeException(
-              "Internal row lock state inconsistent, should not happen, row: " + row);
-        }
-        latch.countDown();
-      }
-    }
+  public long getSequenceId() {
+    return this.mvcc.getReadPoint();
   }
 
-  public static class RowLockImpl implements RowLock {
-    private RowLockContext context;
-    private boolean released = false;
-
-    @VisibleForTesting
-    public RowLockContext getContext() {
-      return context;
-    }
-
-    @VisibleForTesting
-    public void setContext(RowLockContext context) {
-      this.context = context;
-    }
-
-    @Override
-    public void release() {
-      if (!released) {
-        context.releaseLock();
-      }
-      released = true;
-    }
-  }
 
   /**
    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore
    * the WALEdit append later.
    * @param wal
-   * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
-   *        be updated with right mvcc values(their wal sequence number)
    * @return Return the key used appending with no sync and no append.
    * @throws IOException
    */
-  private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
+  private WALKey appendEmptyEdit(final WAL wal) throws IOException {
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     @SuppressWarnings("deprecation")
-    WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
-      WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
-    // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
+    WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
+      getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
+      HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
+    
+    // Call append but with an empty WALEdit.  The returned sequence id will not be associated
     // with any edit and we can be sure it went in after all outstanding appends.
-    wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
-      cells);
+    try {
+      wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false);
+    } catch (Throwable t) {
+      // If exception, our mvcc won't get cleaned up by client, so do it here.
+      getMVCC().complete(key.getWriteEntry());
+    }
     return key;
   }
 


Mime
View raw message