hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/5] hbase git commit: HBASE-14465 Backport 'Allow rowlock to be reader/write' to branch-1
Date Wed, 30 Sep 2015 19:43:26 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 454b9cc..0857fdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -631,7 +631,7 @@ public class HStore implements Store {
     // readers might pick it up. This assumes that the store is not getting any writes (otherwise
     // in-flight transactions might be made visible)
     if (!toBeAddedFiles.isEmpty()) {
-      region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
+      region.getMVCC().advanceTo(this.getMaxSequenceId());
     }
 
     // notify scanners, close file readers, and recompute store size
@@ -1288,7 +1288,7 @@ public class HStore implements Store {
     CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
         family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
     WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
-        this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
+        this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index 2d65387..00f349e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,239 +18,204 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.mortbay.log.Log;
 
 /**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
+ * Manages the read/write consistency. This provides an interface for readers to determine what
+ * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
  * the new writes for readers to read (thus forming atomic transactions).
  */
 @InterfaceAudience.Private
 public class MultiVersionConcurrencyControl {
-  private static final long NO_WRITE_NUMBER = 0;
-  private volatile long memstoreRead = 0;
+  final AtomicLong readPoint = new AtomicLong(0);
+  final AtomicLong writePoint = new AtomicLong(0);
   private final Object readWaiters = new Object();
+  /**
+   * Represents no value, or not set.
+   */
+  public static final long NONE = -1;
 
   // This is the pending queue of writes.
-  private final LinkedList<WriteEntry> writeQueue =
-      new LinkedList<WriteEntry>();
+  //
+  // TODO(eclark): Should this be an array of fixed size to
+  // reduce the number of allocations on the write path?
+  // This could be equal to the number of handlers + a small number.
+  // TODO: St.Ack 20150903 Sounds good to me.
+  private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
 
-  /**
-   * Default constructor. Initializes the memstoreRead/Write points to 0.
-   */
   public MultiVersionConcurrencyControl() {
+    super();
   }
 
   /**
-   * Initializes the memstoreRead/Write points appropriately.
-   * @param startPoint
+   * Construct and set read point. Write point is uninitialized.
    */
-  public void initialize(long startPoint) {
-    synchronized (writeQueue) {
-      writeQueue.clear();
-      memstoreRead = startPoint;
-    }
+  public MultiVersionConcurrencyControl(long startPoint) {
+    tryAdvanceTo(startPoint, NONE);
   }
 
   /**
-   *
-   * @param initVal The value we used initially and expected it'll be reset later
-   * @return WriteEntry instance.
+   * Step the MVCC forward on to a new read/write basis.
+   * @param newStartPoint
    */
-  WriteEntry beginMemstoreInsert() {
-    return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+  public void advanceTo(long newStartPoint) {
+    while (true) {
+      long seqId = this.getWritePoint();
+      if (seqId >= newStartPoint) break;
+      if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
+    }
   }
 
   /**
-   * Get a mvcc write number before an actual one(its log sequence Id) being assigned
-   * @param sequenceId
-   * @return long a faked write number which is bigger enough not to be seen by others before a real
-   *         one is assigned
+   * Step the MVCC forward on to a new read/write basis.
+   * @param newStartPoint Point to move read and write points to.
+   * @param expected If not -1 (#NONE)
+   * @return Returns false if <code>expected</code> is not equal to the
+   * current <code>readPoint</code> or if <code>startPoint</code> is less than current
+   * <code>readPoint</code>
    */
-  public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
-    // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
-    // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
-    // because each handler could increment sequence num twice and max concurrent in-flight
-    // transactions is the number of RPC handlers.
-    // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
-    // changes touch same row key.
-    // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
-    // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
-    // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
-    return sequenceId.incrementAndGet() + 1000000000;
+  boolean tryAdvanceTo(long newStartPoint, long expected) {
+    synchronized (writeQueue) {
+      long currentRead = this.readPoint.get();
+      long currentWrite = this.writePoint.get();
+      if (currentRead != currentWrite) {
+        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
+          ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
+      }
+      if (expected != NONE && expected != currentRead) {
+        return false;
+      }
+
+      if (newStartPoint < currentRead) {
+        return false;
+      }
+
+      readPoint.set(newStartPoint);
+      writePoint.set(newStartPoint);
+    }
+    return true;
   }
 
   /**
-   * This function starts a MVCC transaction with current region's log change sequence number. Since
-   * we set change sequence number when flushing current change to WAL(late binding), the flush
-   * order may differ from the order to start a MVCC transaction. For example, a change begins a
-   * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
-   * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
-   * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
-   * big number is safe because we only need it to prevent current change being seen and the number
-   * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
-   * for MVCC to align with flush sequence.
-   * @param curSeqNum
-   * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
+   * to our queue of ongoing writes. Return this WriteEntry instance.
+   * To complete the write transaction and wait for it to be visible, call
+   * {@link #completeAndWait(WriteEntry)}. If the write failed, call
+   * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
+   * transaction.
+   * @see #complete(WriteEntry)
+   * @see #completeAndWait(WriteEntry)
    */
-  public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
-    WriteEntry e = new WriteEntry(curSeqNum);
+  public WriteEntry begin() {
     synchronized (writeQueue) {
+      long nextWriteNumber = writePoint.incrementAndGet();
+      WriteEntry e = new WriteEntry(nextWriteNumber);
       writeQueue.add(e);
       return e;
     }
   }
 
   /**
-   * Complete a {@link WriteEntry} that was created by
-   * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
-   * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
-   * visible to MVCC readers.
-   * @throws IOException
-   */
-  public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
-      throws IOException {
-    if(e == null) return;
-    if (seqId != null) {
-      e.setWriteNumber(seqId.getSequenceId());
-    } else {
-      // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
-      // function beginMemstoreInsertWithSeqNum in case of failures
-      e.setWriteNumber(NO_WRITE_NUMBER);
-    }
-    waitForPreviousTransactionsComplete(e);
-  }
-
-  /**
-   * Cancel a write insert that failed.
-   * Removes the write entry without advancing read point or without interfering with write
-   * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
-   * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
-   * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
-   * it as for special handling).
-   * @param writeEntry Failed attempt at write. Does cleanup.
+   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
+   * to complete.
    */
-  public void cancelMemstoreInsert(WriteEntry writeEntry) {
-    // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
-    // readpoint and gets my little writeEntry completed and removed from queue of outstanding
-    // events which seems right.  St.Ack 20150901.
-    writeEntry.setWriteNumber(NO_WRITE_NUMBER);
-    advanceMemstore(writeEntry);
+  public void await() {
+    // Add a write and then wait on reads to catch up to it.
+    completeAndWait(begin());
   }
 
   /**
-   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
-   * end of this call, the global read point is at least as large as the write point of the passed
-   * in WriteEntry. Thus, the write is visible to MVCC readers.
+   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
+   * read point catches up to our write.
+   *
+   * At the end of this call, the global read point is at least as large as the write point
+   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
    */
-  public void completeMemstoreInsert(WriteEntry e) {
-    waitForPreviousTransactionsComplete(e);
+  public void completeAndWait(WriteEntry e) {
+    complete(e);
+    waitForRead(e);
   }
 
   /**
-   * Mark the {@link WriteEntry} as complete and advance the read point as
-   * much as possible.
+   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
+   * Call this even if the write has FAILED (AFTER backing out the write transaction
+   * changes completely) so we can clean up the outstanding transaction.
    *
    * How much is the read point advanced?
-   * Let S be the set of all write numbers that are completed and where all previous write numbers
-   * are also completed.  Then, the read point is advanced to the supremum of S.
+   * 
+   * Let S be the set of all write numbers that are completed. Set the read point to the highest
+   * numbered write of S.
+   *
+   * @param writeEntry
    *
-   * @param e
    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
    */
-  boolean advanceMemstore(WriteEntry e) {
-    long nextReadValue = -1;
+  public boolean complete(WriteEntry writeEntry) {
     synchronized (writeQueue) {
-      e.markCompleted();
+      writeEntry.markCompleted();
 
+      long nextReadValue = NONE;
+      boolean ranOnce = false;
       while (!writeQueue.isEmpty()) {
+        ranOnce = true;
         WriteEntry queueFirst = writeQueue.getFirst();
+
+        if (nextReadValue > 0) {
+          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
+            throw new RuntimeException("Invariant in complete violated, nextReadValue="
+                + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
+          }
+        }
+
         if (queueFirst.isCompleted()) {
-          // Using Max because Edit complete in WAL sync order not arriving order
-          nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+          nextReadValue = queueFirst.getWriteNumber();
           writeQueue.removeFirst();
         } else {
           break;
         }
       }
 
-      if (nextReadValue > memstoreRead) {
-        memstoreRead = nextReadValue;
+      if (!ranOnce) {
+        throw new RuntimeException("There is no first!");
       }
 
-      // notify waiters on writeQueue before return
-      writeQueue.notifyAll();
-    }
-
-    if (nextReadValue > 0) {
-      synchronized (readWaiters) {
-        readWaiters.notifyAll();
-      }
-    }
-
-    if (memstoreRead >= e.getWriteNumber()) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Advances the current read point to be given seqNum if it is smaller than
-   * that.
-   */
-  void advanceMemstoreReadPointIfNeeded(long seqNum) {
-    synchronized (writeQueue) {
-      if (this.memstoreRead < seqNum) {
-        memstoreRead = seqNum;
+      if (nextReadValue > 0) {
+        synchronized (readWaiters) {
+          readPoint.set(nextReadValue);
+          readWaiters.notifyAll();
+        }
       }
+      return readPoint.get() >= writeEntry.getWriteNumber();
     }
   }
 
   /**
-   * Wait for all previous MVCC transactions complete
+   * Wait for the global readPoint to advance up to the passed in write entry number.
    */
-  public void waitForPreviousTransactionsComplete() {
-    WriteEntry w = beginMemstoreInsert();
-    waitForPreviousTransactionsComplete(w);
-  }
-
-  public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+  void waitForRead(WriteEntry e) {
     boolean interrupted = false;
-    WriteEntry w = waitedEntry;
-
-    try {
-      WriteEntry firstEntry = null;
-      do {
-        synchronized (writeQueue) {
-          // writeQueue won't be empty at this point, the following is just a safety check
-          if (writeQueue.isEmpty()) {
-            break;
-          }
-          firstEntry = writeQueue.getFirst();
-          if (firstEntry == w) {
-            // all previous in-flight transactions are done
-            break;
-          }
-          try {
-            writeQueue.wait(0);
-          } catch (InterruptedException ie) {
-            // We were interrupted... finish the loop -- i.e. cleanup --and then
-            // on our way out, reset the interrupt flag.
-            interrupted = true;
-            break;
-          }
+    int count = 0;
+    synchronized (readWaiters) {
+      while (readPoint.get() < e.getWriteNumber()) {
+        if (count % 100 == 0 && count > 0) {
+          Log.warn("STUCK: " + this);
+        }
+        count++;
+        try {
+          readWaiters.wait(10);
+        } catch (InterruptedException ie) {
+          // We were interrupted... finish the loop -- i.e. cleanup --and then
+          // on our way out, reset the interrupt flag.
+          interrupted = true;
         }
-      } while (firstEntry != null);
-    } finally {
-      if (w != null) {
-        advanceMemstore(w);
       }
     }
     if (interrupted) {
@@ -258,28 +223,60 @@ public class MultiVersionConcurrencyControl {
     }
   }
 
-  public long memstoreReadPoint() {
-    return memstoreRead;
+  @VisibleForTesting
+  public String toString() {
+    StringBuffer sb = new StringBuffer(256);
+    sb.append("readPoint=");
+    sb.append(this.readPoint.get());
+    sb.append(", writePoint=");
+    sb.append(this.writePoint);
+    synchronized (this.writeQueue) {
+      for (WriteEntry we: this.writeQueue) {
+        sb.append(", [");
+        sb.append(we);
+        sb.append("]");
+      }
+    }
+    return sb.toString();
+  }
+
+  public long getReadPoint() {
+    return readPoint.get();
+  }
+
+  @VisibleForTesting
+  public long getWritePoint() {
+    return writePoint.get();
   }
 
+  /**
+   * Write number and whether write has completed given out at start of a write transaction.
+   * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
+   */
+  @InterfaceAudience.Private
   public static class WriteEntry {
-    private long writeNumber;
-    private volatile boolean completed = false;
+    private final long writeNumber;
+    private boolean completed = false;
 
     WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
+
     void markCompleted() {
       this.completed = true;
     }
+
     boolean isCompleted() {
       return this.completed;
     }
-    long getWriteNumber() {
+
+    public long getWriteNumber() {
       return this.writeNumber;
     }
-    void setWriteNumber(long val){
-      this.writeNumber = val;
+
+    @Override
+    public String toString() {
+      return this.writeNumber + ", " + this.completed;
     }
   }
 
@@ -287,5 +284,4 @@ public class MultiVersionConcurrencyControl {
       ClassSize.OBJECT +
       2 * Bytes.SIZEOF_LONG +
       2 * ClassSize.REFERENCE);
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 39a1935..928f4b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -62,7 +61,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -112,7 +110,7 @@ import com.lmax.disruptor.dsl.ProducerType;
  *
  * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
  * org.apache.hadoop.fs.Path)}.
- * 
+ *
  * <h2>Failure Semantic</h2>
  * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
  * any more appends or syncs will fail also with the same original exception. If we have made
@@ -142,7 +140,7 @@ public class FSHLog implements WAL {
   // Calls to append now also wait until the append has been done on the consumer side of the
   // disruptor.  We used to not wait but it makes the implemenation easier to grok if we have
   // the region edit/sequence id after the append returns.
-  // 
+  //
   // TODO: Handlers need to coordinate appending AND syncing.  Can we have the threads contend
   // once only?  Probably hard given syncs take way longer than an append.
   //
@@ -233,7 +231,7 @@ public class FSHLog implements WAL {
   private final String logFilePrefix;
 
   /**
-   * Suffix included on generated wal file names 
+   * Suffix included on generated wal file names
    */
   private final String logFileSuffix;
 
@@ -250,13 +248,14 @@ public class FSHLog implements WAL {
   protected final Configuration conf;
 
   /** Listeners that are called on WAL events. */
-  private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+  private final List<WALActionsListener> listeners =
+    new CopyOnWriteArrayList<WALActionsListener>();
 
   @Override
   public void registerWALActionsListener(final WALActionsListener listener) {
     this.listeners.add(listener);
   }
-  
+
   @Override
   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
     return this.listeners.remove(listener);
@@ -612,7 +611,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about pre log roll.
-   * @throws IOException 
+   * @throws IOException
    */
   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -625,7 +624,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about post log roll.
-   * @throws IOException 
+   * @throws IOException
    */
   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -1053,27 +1052,11 @@ public class FSHLog implements WAL {
     }
   }
 
-  /**
-   * @param now
-   * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
-   * @param tableName
-   * @param clusterIds that have consumed the change
-   * @return New log key.
-   */
-  @SuppressWarnings("deprecation")
-  protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
-      long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
-  }
-  
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
       justification="Will never be null")
   @Override
   public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
-      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
-      final List<Cell> memstoreCells) throws IOException {
+      final WALEdit edits, final boolean inMemstore) throws IOException {
     if (this.closed) throw new IOException("Cannot append; log is closed");
     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
     // single consuming thread.  Don't have to worry about it.
@@ -1087,9 +1070,9 @@ public class FSHLog implements WAL {
     try {
       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
       // Construction of FSWALEntry sets a latch.  The latch is thrown just after we stamp the
-      // edit with its edit/sequence id.  The below entry.getRegionSequenceId will wait on the
-      // latch to be thrown.  TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
-      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
+      // edit with its edit/sequence id.
+      // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
+      entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
       truck.loadPayload(entry, scope.detach());
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
@@ -1116,9 +1099,9 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
- 
+
     /**
-     * UPDATE! 
+     * UPDATE!
      * @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
      * we will put the result of the actual hdfs sync call as the result.
      * @param sequence The sequence number on the ring buffer when this thread was set running.
@@ -1166,7 +1149,7 @@ public class FSHLog implements WAL {
       // This function releases one sync future only.
       return 1;
     }
- 
+
     /**
      * Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
      * @param currentSequence
@@ -1570,7 +1553,7 @@ public class FSHLog implements WAL {
    * 'safe point' while the orchestrating thread does some work that requires the first thread
    * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
    * thread.
-   * 
+   *
    * <p>Thread A signals Thread B to hold when it gets to a 'safe point'.  Thread A wait until
    * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
    * Thread B then holds at the 'safe point'.  Thread A on notification that Thread B is paused,
@@ -1578,7 +1561,7 @@ public class FSHLog implements WAL {
    * it flags B and then Thread A and Thread B continue along on their merry way.  Pause and
    * signalling 'zigzags' between the two participating threads.  We use two latches -- one the
    * inverse of the other -- pausing and signaling when states are achieved.
-   * 
+   *
    * <p>To start up the drama, Thread A creates an instance of this class each time it would do
    * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
    * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1600,7 +1583,7 @@ public class FSHLog implements WAL {
      * Latch to wait on.  Will be released when we can proceed.
      */
     private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
- 
+
     /**
      * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
      * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1609,7 +1592,7 @@ public class FSHLog implements WAL {
      * @throws InterruptedException
      * @throws ExecutionException
      * @return The passed <code>syncFuture</code>
-     * @throws FailedSyncBeforeLogCloseException 
+     * @throws FailedSyncBeforeLogCloseException
      */
     SyncFuture waitSafePoint(final SyncFuture syncFuture)
     throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1621,7 +1604,7 @@ public class FSHLog implements WAL {
       }
       return syncFuture;
     }
- 
+
     /**
      * Called by Thread B when it attains the 'safe point'.  In this method, Thread B signals
      * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1859,9 +1842,8 @@ public class FSHLog implements WAL {
         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
         // so region sequenceids will also be in order.
         regionSequenceId = entry.stampRegionSequenceId();
-
-        // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a 
-        // region sequence id only, a region edit/sequence id that is not associated with an actual 
+        // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a
+        // region sequence id only, a region edit/sequence id that is not associated with an actual
         // edit. It has to go through all the rigmarole to be sure we have the right ordering.
         if (entry.getEdit().isEmpty()) {
           return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index a768660..7f3eb61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 
@@ -51,23 +50,18 @@ class FSWALEntry extends Entry {
   // The below data members are denoted 'transient' just to highlight these are not persisted;
   // they are only in memory and held here while passing over the ring buffer.
   private final transient long sequence;
-  private final transient AtomicLong regionSequenceIdReference;
   private final transient boolean inMemstore;
   private final transient HTableDescriptor htd;
   private final transient HRegionInfo hri;
-  private final transient List<Cell> memstoreCells;
   private final Set<byte[]> familyNames;
 
   FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
-      final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
-      final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
+      final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
     super(key, edit);
-    this.regionSequenceIdReference = referenceToRegionSequenceId;
     this.inMemstore = inMemstore;
     this.htd = htd;
     this.hri = hri;
     this.sequence = sequence;
-    this.memstoreCells = memstoreCells;
     if (inMemstore) {
       // construct familyNames here to reduce the work of log sinker.
       ArrayList<Cell> cells = this.getEdit().getCells();
@@ -111,24 +105,30 @@ class FSWALEntry extends Entry {
   }
 
   /**
-   * Stamp this edit with a region edit/sequence id.
-   * Call when safe to do so: i.e. the context is such that the increment on the passed in
-   * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
-   * WAL.  This method works with {@link #getRegionSequenceId()}.  It will block waiting on this
-   * method to be called.
-   * @return The region edit/sequence id we set for this edit.
+   * Here is where a WAL edit gets its sequenceid.
+   * @return The sequenceid we stamped on this edit.
    * @throws IOException
-   * @see #getRegionSequenceId()
    */
   long stampRegionSequenceId() throws IOException {
-    long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
-      for (Cell cell : this.memstoreCells) {
-        CellUtil.setSequenceId(cell, regionSequenceId);
+    long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+    MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
+    MultiVersionConcurrencyControl.WriteEntry we = null;
+
+    if (mvcc != null) {
+      we = mvcc.begin();
+      regionSequenceId = we.getWriteNumber();
+    }
+
+    if (!this.getEdit().isReplay() && inMemstore) {
+      for (Cell c:getEdit().getCells()) {
+        CellUtil.setSequenceId(c, regionSequenceId);
       }
     }
+
+    // This has to stay in this order
     WALKey key = getKey();
     key.setLogSeqNum(regionSequenceId);
+    key.setWriteEntry(we);
     return regionSequenceId;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 5218981..28141a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.io.Writable;
@@ -69,10 +70,18 @@ public class HLogKey extends WALKey implements Writable {
     super(encodedRegionName, tablename);
   }
 
+  @VisibleForTesting
   public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
     super(encodedRegionName, tablename, now);
   }
 
+  public HLogKey(final byte[] encodedRegionName,
+                 final TableName tablename,
+                 final long now,
+                 final MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, mvcc);
+  }
+
   /**
    * Create the log key for writing to somewhere.
    * We maintain the tablename mainly for debugging purposes.
@@ -86,9 +95,16 @@ public class HLogKey extends WALKey implements Writable {
    * @param now Time at which this edit was written.
    * @param clusterIds the clusters that have consumed the change(used in Replication)
    */
-  public HLogKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+  public HLogKey(
+      final byte[] encodedRegionName,
+      final TableName tablename,
+      long logSeqNum,
+      final long now,
+      List<UUID> clusterIds,
+      long nonceGroup,
+      long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -104,9 +120,14 @@ public class HLogKey extends WALKey implements Writable {
    * @param nonceGroup
    * @param nonce
    */
-  public HLogKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+  public HLogKey(final byte[] encodedRegionName,
+                 final TableName tablename,
+                 final long now,
+                 List<UUID> clusterIds,
+                 long nonceGroup,
+                 long nonce,
+                 final MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -122,8 +143,8 @@ public class HLogKey extends WALKey implements Writable {
    * @param nonce
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
-      long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
+      long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -141,7 +162,8 @@ public class HLogKey extends WALKey implements Writable {
       Compressor.writeCompressed(this.encodedRegionName, 0,
           this.encodedRegionName.length, out,
           compressionContext.regionDict);
-      Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
+      Compressor.writeCompressed(this.tablename.getName(), 0,
+          this.tablename.getName().length, out,
           compressionContext.tableDict);
     }
     out.writeLong(this.logSeqNum);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
index cb89346..f7ae208 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 
 /**
  * An HLogKey specific to WalEdits coming from replay.
@@ -32,13 +33,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class ReplayHLogKey extends HLogKey {
 
   public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+      final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index be39873..e41e1c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.Text;
 
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
+  HBaseInterfaceAudience.CONFIG})
 public class SequenceFileLogReader extends ReaderBase {
   private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
 
@@ -273,8 +274,10 @@ public class SequenceFileLogReader extends ReaderBase {
       end = fEnd.getLong(this.reader);
     } catch(NoSuchFieldException nfe) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(nfe);
     } catch(IllegalAccessException iae) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(iae);
     } catch(Exception e) {
        /* All other cases. Should we handle it more aggressively? */
        LOG.warn("Unexpected exception when accessing the end field", e);
@@ -293,8 +296,10 @@ public class SequenceFileLogReader extends ReaderBase {
         .initCause(ioe);
     } catch(NoSuchMethodException nfe) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(nfe);
     } catch(IllegalAccessException iae) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(iae);
     } catch(Exception e) {
        /* All other cases. Should we handle it more aggressively? */
        LOG.warn("Unexpected exception when accessing the end field", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d2119d7..5e53e41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -73,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
  * where, the WALEdit is serialized as:
  *   &lt;-1, # of edits, &lt;KeyValue&gt;, &lt;KeyValue&gt;, ... &gt;
  * For example:
- *   &lt;-1, 3, &lt;Keyvalue-for-edit-c1&gt;, &lt;KeyValue-for-edit-c2&gt;, &lt;KeyValue-for-edit-c3&gt;&gt;
+ *   &lt;-1, 3, &lt;KV-for-edit-c1&gt;, &lt;KV-for-edit-c2&gt;, &lt;KV-for-edit-c3&gt;&gt;
  *
  * The -1 marker is just a special way of being backward compatible with
  * an old WAL which would have contained a single &lt;KeyValue&gt;.
@@ -104,6 +104,9 @@ public class WALEdit implements Writable, HeapSize {
   public static final WALEdit EMPTY_WALEDIT = new WALEdit();
 
   // Only here for legacy writable deserialization
+  /**
+   * @deprecated Legacy
+   */
   @Deprecated
   private NavigableMap<byte[], Integer> scopes;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 399623f..c89a466 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,20 +20,17 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 
@@ -48,36 +45,34 @@ import com.google.protobuf.TextFormat;
 public class WALUtil {
   private static final Log LOG = LogFactory.getLog(WALUtil.class);
 
+  private WALUtil() {
+    // Shut down construction of this class.
+  }
+
   /**
    * Write the marker that a compaction has succeeded and is about to be committed.
    * This provides info to the HMaster to allow it to recover the compaction if
    * this regionserver dies in the middle (This part is not yet implemented). It also prevents
    * the compaction from finishing if this regionserver has already lost its lease on the log.
-   * @param sequenceId Used by WAL to get sequence Id for the waledit.
+   * @param mvcc Used by WAL to get sequence Id for the waledit.
    */
-  public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
-    TableName tn = TableName.valueOf(c.getTableName().toByteArray());
-    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
-    log.sync();
+  public static long writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+      final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
+  throws IOException {
+    long trx = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc, true);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }
+    return trx;
   }
 
   /**
    * Write a flush marker indicating a start / abort or a complete of a region flush
    */
-  public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
-    TableName tn = TableName.valueOf(f.getTableName().toByteArray());
-    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
-        null);
-    if (sync) log.sync(trx);
+  public static long writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+      final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
+  throws IOException {
+    long trx = writeMarker(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
     }
@@ -87,14 +82,10 @@ public class WALUtil {
   /**
    * Write a region open marker indicating that the region is opened
    */
-  public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
-    TableName tn = TableName.valueOf(r.getTableName().toByteArray());
-    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
-      sequenceId, false, null);
-    log.sync(trx);
+  public static long writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+      final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
+  throws IOException {
+    long trx = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, true);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
     }
@@ -106,35 +97,40 @@ public class WALUtil {
    *
    * @param wal        The log to write into.
    * @param htd        A description of the table that we are bulk loading into.
-   * @param info       A description of the region in the table that we are bulk loading into.
-   * @param descriptor A protocol buffers based description of the client's bulk loading request
-   * @param sequenceId The current sequenceId in the log at the time when we were to write the
-   *                   bulk load marker.
+   * @param hri       A description of the region in the table that we are bulk loading into.
+   * @param desc A protocol buffers based description of the client's bulk loading request
    * @return txid of this transaction or if nothing to do, the last txid
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
-  public static long writeBulkLoadMarkerAndSync(final WAL wal,
-                                                final HTableDescriptor htd,
-                                                final HRegionInfo info,
-                                                final WALProtos.BulkLoadDescriptor descriptor,
-                                                final AtomicLong sequenceId) throws IOException {
-    TableName tn = info.getTable();
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+  public static long writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
+      final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
+      final MultiVersionConcurrencyControl mvcc)
+  throws IOException {
+    long trx = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, true);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
+    }
+    return trx;
+  }
 
+  private static long writeMarker(final WAL wal, final HTableDescriptor htd, final HRegionInfo hri,
+      final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
+  throws IOException {
+    // TODO: Pass in current time to use?
+    WALKey key =
+      new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
     // Add it to the log but the false specifies that we don't need to add it to the memstore
-    long trx = wal.append(htd,
-            info,
-            key,
-            WALEdit.createBulkLoadEvent(info, descriptor),
-            sequenceId,
-            false,
-            new ArrayList<Cell>());
-    wal.sync(trx);
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(descriptor));
+    long trx = MultiVersionConcurrencyControl.NONE;
+    try {
+      trx = wal.append(htd, hri, key, edit, false);
+      if (sync) wal.sync(trx);
+    } finally {
+      // If you get hung here, is it a real WAL or a mocked WAL? If the latter, you need to
+      // trip the latch that is inside in getWriteEntry up in your mock. See down in the append
+      // called from onEvent in FSHLog.
+      MultiVersionConcurrencyControl.WriteEntry we = key.getWriteEntry();
+      if (mvcc != null && we != null) mvcc.complete(we);
     }
     return trx;
   }
-  
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
index f628cee..84d6128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
@@ -55,7 +55,7 @@ public class HashedBytes {
     if (obj == null || getClass() != obj.getClass())
       return false;
     HashedBytes other = (HashedBytes) obj;
-    return Arrays.equals(bytes, other.bytes);
+    return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 56d17a2..33785a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -155,7 +155,7 @@ class DisabledWALProvider implements WALProvider {
 
     @Override
     public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-        AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
+                       boolean inMemstore) {
       if (!this.listeners.isEmpty()) {
         final long start = System.nanoTime();
         long len = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 4844487..d2b336e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,17 +21,13 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 // imports we use from yet-to-be-moved regionsever.wal
 import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@@ -114,19 +110,16 @@ public interface WAL {
    * @param key Modified by this call; we add to it this edits region edit/sequence id.
    * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
    * sequence id that is after all currently appended edits.
-   * @param htd used to give scope for replication TODO refactor out in favor of table name and info
-   * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
-   * source of its incrementing edits sequence id.  Inside in this call we will increment it and
-   * attach the sequence to the edit we apply the WAL.
+   * @param htd used to give scope for replication TODO refactor out in favor of table name and
+   * info
    * @param inMemstore Always true except for case where we are writing a compaction completion
    * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
    * -- it is not an edit for memstore.
-   * @param memstoreKVs list of KVs added into memstore
    * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
    * in it.
    */
   long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-      AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
+    boolean inMemstore)
   throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 74284e0..05acd72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,13 +69,55 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
  *
  * Note that protected members marked @InterfaceAudience.Private are only protected
  * to support the legacy HLogKey class, which is in a different package.
+ * 
+ * <p>
  */
 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
 //       purposes. They need to be merged into WALEntry.
+// TODO: Cleanup. We have logSeqNum and then WriteEntry, both are sequence id'ing. Fix.
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class WALKey implements SequenceId, Comparable<WALKey> {
   private static final Log LOG = LogFactory.getLog(WALKey.class);
 
+  @InterfaceAudience.Private // For internal use only.
+  public MultiVersionConcurrencyControl getMvcc() {
+    return mvcc;
+  }
+
+  /**
+   * Will block until a write entry has been assigned by they WAL subsystem.
+   * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling
+   * mvcc#complete or mvcc#completeAndWait.
+   * @throws InterruptedIOException
+   * @see
+   * #setWriteEntry(org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry)
+   */
+  @InterfaceAudience.Private // For internal use only.
+  public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
+    try {
+      this.seqNumAssignedLatch.await();
+    } catch (InterruptedException ie) {
+      // If interrupted... clear out our entry else we can block up mvcc.
+      MultiVersionConcurrencyControl mvcc = getMvcc();
+      LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
+      if (mvcc != null) {
+        if (this.writeEntry != null) {
+          mvcc.complete(this.writeEntry);
+        }
+      }
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      throw iie;
+    }
+    return this.writeEntry;
+  }
+
+  @InterfaceAudience.Private // For internal use only.
+  public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
+    this.writeEntry = writeEntry;
+    this.seqNumAssignedLatch.countDown();
+  }
+
   // should be < 0 (@see HLogKey#readFields(DataInput))
   // version 2 supports WAL compression
   // public members here are only public because of HLogKey
@@ -151,7 +194,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
 
   private long nonceGroup = HConstants.NO_NONCE;
   private long nonce = HConstants.NO_NONCE;
-  static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
+  private MultiVersionConcurrencyControl mvcc;
+  private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+  public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
 
   // visible for deprecated HLogKey
   @InterfaceAudience.Private
@@ -159,16 +204,17 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
 
   public WALKey() {
     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
-        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
   }
 
   @VisibleForTesting
-  public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+                long logSeqNum,
       final long now, UUID clusterId) {
     List<UUID> clusterIds = new ArrayList<UUID>();
     clusterIds.add(clusterId);
     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
-        HConstants.NO_NONCE, HConstants.NO_NONCE);
+        HConstants.NO_NONCE, HConstants.NO_NONCE, null);
   }
 
   public WALKey(final byte[] encodedRegionName, final TableName tablename) {
@@ -176,8 +222,28 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   }
 
   public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
-    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
-        EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    init(encodedRegionName,
+        tablename,
+        NO_SEQUENCE_ID,
+        now,
+        EMPTY_UUIDS,
+        HConstants.NO_NONCE,
+        HConstants.NO_NONCE,
+        null);
+  }
+
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                final long now,
+                MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName,
+        tablename,
+        NO_SEQUENCE_ID,
+        now,
+        EMPTY_UUIDS,
+        HConstants.NO_NONCE,
+        HConstants.NO_NONCE,
+        mvcc);
   }
 
   /**
@@ -187,15 +253,21 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * <p>Used by log splitting and snapshots.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
-   * @param tablename   - name of table
-   * @param logSeqNum   - log sequence number
-   * @param now Time at which this edit was written.
-   * @param clusterIds the clusters that have consumed the change(used in Replication)
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tablename         - name of table
+   * @param logSeqNum         - log sequence number
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                long logSeqNum,
+                final long now,
+                List<UUID> clusterIds,
+                long nonceGroup,
+                long nonce,
+                MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -204,17 +276,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * A regionName is always a sub-table object.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tablename
-   * @param now Time at which this edit was written.
-   * @param clusterIds the clusters that have consumed the change(used in Replication)
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
    * @param nonceGroup
    * @param nonce
+   * @param mvcc mvcc control used to generate sequence numbers and control read/write points
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
-      nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+                final long now, List<UUID> clusterIds, long nonceGroup,
+                final long nonce, final MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -223,21 +296,37 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * A regionName is always a sub-table object.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tablename
    * @param logSeqNum
    * @param nonceGroup
    * @param nonce
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
-      long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
-      EMPTY_UUIDS, nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                long logSeqNum,
+                long nonceGroup,
+                long nonce,
+                final MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName,
+        tablename,
+        logSeqNum,
+        EnvironmentEdgeManager.currentTime(),
+        EMPTY_UUIDS,
+        nonceGroup,
+        nonce,
+        mvcc);
   }
 
   @InterfaceAudience.Private
-  protected void init(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+  protected void init(final byte[] encodedRegionName,
+                      final TableName tablename,
+                      long logSeqNum,
+                      final long now,
+                      List<UUID> clusterIds,
+                      long nonceGroup,
+                      long nonce,
+                      MultiVersionConcurrencyControl mvcc) {
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterIds = clusterIds;
@@ -245,6 +334,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     this.tablename = tablename;
     this.nonceGroup = nonceGroup;
     this.nonce = nonce;
+    this.mvcc = mvcc;
   }
 
   /**
@@ -270,15 +360,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   }
 
   /**
-   * Allow that the log sequence id to be set post-construction and release all waiters on assigned
-   * sequence number.
+   * Allow that the log sequence id to be set post-construction
    * Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
    * @param sequence
    */
   @InterfaceAudience.Private
   public void setLogSeqNum(final long sequence) {
     this.logSeqNum = sequence;
-    this.seqNumAssignedLatch.countDown();
+
   }
 
   /**
@@ -492,21 +581,22 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     this.encodedRegionName = encodedRegionName;
   }
 
-  public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
-  throws IOException {
-    org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+  public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
+      WALCellCodec.ByteStringCompressor compressor) throws IOException {
+    org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
+        org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
     if (compressionContext == null) {
       builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
       builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
     } else {
       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
-        compressionContext.regionDict));
+          compressionContext.regionDict));
       builder.setTableName(compressor.compress(this.tablename.getName(),
-        compressionContext.tableDict));
+          compressionContext.tableDict));
     }
     builder.setLogSequenceNumber(this.logSeqNum);
     builder.setWriteTime(writeTime);
-    if(this.origLogSeqNum > 0) {
+    if (this.origLogSeqNum > 0) {
       builder.setOrigSequenceNumber(this.origLogSeqNum);
     }
     if (this.nonce != HConstants.NO_NONCE) {
@@ -532,8 +622,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     return builder;
   }
 
-  public void readFieldsFromPb(
-      org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+  public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
+                               WALCellCodec.ByteStringUncompressor uncompressor)
+      throws IOException {
     if (this.compressionContext != null) {
       this.encodedRegionName = uncompressor.uncompress(
           walKey.getEncodedRegionName(), compressionContext.regionDict);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 7fed610..98882ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2286,7 +2286,7 @@ public class WALSplitter {
       // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
       key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
               walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
-              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
       logEntry.setFirst(key);
       logEntry.setSecond(val);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
index d7a68e3..ea833dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
@@ -90,7 +90,7 @@ public class TestFullLogReconstruction {
    */
   @Test (timeout=300000)
   public void testReconstruction() throws Exception {
-    HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
+    Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
 
     // Load up the table with simple rows and count them
     int initialCount = TEST_UTIL.loadTable(table, FAMILY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 41e7ec5..22531c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,10 +46,10 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -224,8 +223,7 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompaction() throws Exception {
-    doTest(BlockCompactionsInPrepRegion.class, false);
-    doTest(BlockCompactionsInPrepRegion.class, true);
+    doTest(BlockCompactionsInPrepRegion.class);
   }
 
   /**
@@ -236,13 +234,11 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompactionAfterWALSync() throws Exception {
-    doTest(BlockCompactionsInCompletionRegion.class, false);
-    doTest(BlockCompactionsInCompletionRegion.class, true);
+    doTest(BlockCompactionsInCompletionRegion.class);
   }
 
-  public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
+  public void doTest(Class<?> regionClass) throws Exception {
     Configuration c = TEST_UTIL.getConfiguration();
-    c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
     // Insert our custom region
     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
     c.setBoolean("dfs.support.append", true);
@@ -283,7 +279,7 @@ public class TestIOFencing {
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
       WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
-        oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+        oldHri, compactionDescriptor, compactingRegion.getMVCC());
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = System.currentTimeMillis();
@@ -354,4 +350,4 @@ public class TestIOFencing {
       TEST_UTIL.shutdownMiniCluster();
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 24f7190..f5e4026 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -383,7 +383,6 @@ public class TestReplicasClient {
     }
   }
 
-
   @Test
   public void testFlushTable() throws Exception {
     openRegion(hriSecondary);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 53c234e..fb0d843 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -139,6 +139,4 @@ public class TestRegionObserverStacking extends TestCase {
     assertTrue(idA < idB);
     assertTrue(idB < idC);
   }
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index aee1b1f..256c0eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,7 +29,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -186,7 +186,6 @@ public class TestWALObserver {
     Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
     deleteDir(basedir);
     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
-    final AtomicLong sequenceId = new AtomicLong(0);
 
     // TEST_FAMILY[0] shall be removed from WALEdit.
     // TEST_FAMILY[1] value shall be changed.
@@ -235,7 +234,7 @@ public class TestWALObserver {
     long now = EnvironmentEdgeManager.currentTime();
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
-        edit, sequenceId, true, null);
+        edit, true);
     log.sync(txid);
 
     // the edit shall have been change now by the coprocessor.
@@ -271,7 +270,7 @@ public class TestWALObserver {
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
         .toString(TEST_TABLE));
     final HRegionInfo hri = new HRegionInfo(tableName, null, null);
-    final AtomicLong sequenceId = new AtomicLong(0);
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
 
     fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
 
@@ -298,7 +297,7 @@ public class TestWALObserver {
     final int countPerFamily = 5;
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
     }
 
     LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -322,7 +321,7 @@ public class TestWALObserver {
     final WALEdit edit = new WALEdit();
     final byte[] nonce = Bytes.toBytes("1772");
     edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
-    final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+    final long txid = wal.append(htd, hri, legacyKey, edit, true);
     wal.sync(txid);
 
     LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -347,7 +346,7 @@ public class TestWALObserver {
   public void testEmptyWALEditAreNotSeen() throws Exception {
     final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
-    final AtomicLong sequenceId = new AtomicLong(0);
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
 
     WAL log = wals.getWAL(UNSPECIFIED_REGION);
     try {
@@ -359,8 +358,9 @@ public class TestWALObserver {
       assertFalse(cp.isPostWALWriteCalled());
 
       final long now = EnvironmentEdgeManager.currentTime();
-      long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
-          new WALEdit(), sequenceId, true, null);
+      long txid = log.append(htd, hri,
+          new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+          new WALEdit(), true);
       log.sync(txid);
 
       assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@@ -379,7 +379,7 @@ public class TestWALObserver {
     // ultimately called by HRegion::initialize()
     TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
     final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
-    final AtomicLong sequenceId = new AtomicLong(0);
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     // final HRegionInfo hri =
     // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
     // final HRegionInfo hri1 =
@@ -403,10 +403,9 @@ public class TestWALObserver {
     // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
     }
-    wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
-        true, null);
+    wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
     // sync to fs.
     wal.sync();
 
@@ -526,7 +525,7 @@ public class TestWALObserver {
 
   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
-      final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
+      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
     String familyStr = Bytes.toString(family);
     long txid = -1;
     for (int j = 0; j < count; j++) {
@@ -537,7 +536,7 @@ public class TestWALObserver {
       // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
       // about legacy coprocessors
       txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
-          ee.currentTime()), edit, sequenceId, true, null);
+          ee.currentTime(), mvcc), edit, true);
     }
     if (-1 != txid) {
       wal.sync(txid);

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index dd47325..0ceae46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -31,8 +31,8 @@ import org.junit.experimental.categories.Category;
 public class TestHLogRecordReader extends TestWALRecordReader {
 
   @Override
-  protected WALKey getWalKey(final long sequenceid) {
-    return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+  protected WALKey getWalKey(final long time) {
+    return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/bae2a970/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 66cee55..201ffe6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -74,6 +75,7 @@ public class TestWALRecordReader {
   private static final byte [] value = Bytes.toBytes("value");
   private static HTableDescriptor htd;
   private static Path logDir;
+  protected MultiVersionConcurrencyControl mvcc;
 
   private static String getName() {
     return "TestWALRecordReader";
@@ -81,6 +83,7 @@ public class TestWALRecordReader {
 
   @Before
   public void setUp() throws Exception {
+    mvcc = new MultiVersionConcurrencyControl();
     FileStatus[] entries = fs.listStatus(hbaseDir);
     for (FileStatus dir : entries) {
       fs.delete(dir.getPath(), true);
@@ -123,13 +126,11 @@ public class TestWALRecordReader {
     // being millisecond based.
     long ts = System.currentTimeMillis();
     WALEdit edit = new WALEdit();
-    final AtomicLong sequenceId = new AtomicLong(0);
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
-    log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+    log.append(htd, info, getWalKey(ts), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
-    log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts+1), edit, true);
     log.sync();
     LOG.info("Before 1st WAL roll " + log.toString());
     log.rollWriter();
@@ -140,12 +141,10 @@ public class TestWALRecordReader {
 
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
-    log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts1+1), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
-    log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts1+2), edit, true);
     log.sync();
     log.shutdown();
     walfactory.shutdown();
@@ -187,8 +186,7 @@ public class TestWALRecordReader {
     WALEdit edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
         System.currentTimeMillis(), value));
-    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
-        null);
+    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
     log.sync(txid);
 
     Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -198,8 +196,7 @@ public class TestWALRecordReader {
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
         System.currentTimeMillis(), value));
-    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
-        null);
+    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
     log.sync(txid);
     log.shutdown();
     walfactory.shutdown();
@@ -238,8 +235,8 @@ public class TestWALRecordReader {
     testSplit(splits.get(1));
   }
 
-  protected WALKey getWalKey(final long sequenceid) {
-    return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+  protected WALKey getWalKey(final long time) {
+    return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
   }
 
   protected WALRecordReader getReader() {


Mime
View raw message