hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/5] hbase git commit: HBASE-12751 Allow RowLock to be reader writer
Date Wed, 23 Sep 2015 00:23:21 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index 2d65387..d101f7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,239 +18,198 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
+ * Manages the read/write consistency. This provides an interface for readers to determine what
+ * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
  * the new writes for readers to read (thus forming atomic transactions).
  */
 @InterfaceAudience.Private
 public class MultiVersionConcurrencyControl {
-  private static final long NO_WRITE_NUMBER = 0;
-  private volatile long memstoreRead = 0;
+  final AtomicLong readPoint = new AtomicLong(0);
+  final AtomicLong writePoint = new AtomicLong(0);
   private final Object readWaiters = new Object();
+  /**
+   * Represents no value, or not set.
+   */
+  private static final long NONE = -1;
 
   // This is the pending queue of writes.
-  private final LinkedList<WriteEntry> writeQueue =
-      new LinkedList<WriteEntry>();
+  //
+  // TODO(eclark): Should this be an array of fixed size to
+  // reduce the number of allocations on the write path?
+  // This could be equal to the number of handlers + a small number.
+  // TODO: St.Ack 20150903 Sounds good to me.
+  private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
 
-  /**
-   * Default constructor. Initializes the memstoreRead/Write points to 0.
-   */
   public MultiVersionConcurrencyControl() {
+    super();
   }
 
   /**
-   * Initializes the memstoreRead/Write points appropriately.
-   * @param startPoint
+   * Construct and set read point. Write point is uninitialized.
    */
-  public void initialize(long startPoint) {
-    synchronized (writeQueue) {
-      writeQueue.clear();
-      memstoreRead = startPoint;
-    }
+  public MultiVersionConcurrencyControl(long startPoint) {
+    tryAdvanceTo(startPoint, NONE);
   }
 
   /**
-   *
-   * @param initVal The value we used initially and expected it'll be reset later
-   * @return WriteEntry instance.
+   * Step the MVCC forward on to a new read/write basis.
+   * @param newStartPoint
    */
-  WriteEntry beginMemstoreInsert() {
-    return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+  public void advanceTo(long newStartPoint) {
+    while (true) {
+      long seqId = this.getWritePoint();
+      if (seqId >= newStartPoint) break;
+      if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
+    }
   }
 
   /**
-   * Get a mvcc write number before an actual one(its log sequence Id) being assigned
-   * @param sequenceId
-   * @return long a faked write number which is bigger enough not to be seen by others before a real
-   *         one is assigned
+   * Step the MVCC forward on to a new read/write basis.
+   * @param newStartPoint Point to move read and write points to.
+   * @param expected If not -1 (#NONE)
+   * @return Returns false if <code>expected</code> is not equal to the
+   * current <code>readPoint</code> or if <code>startPoint</code> is less than current
+   * <code>readPoint</code>
    */
-  public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
-    // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
-    // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
-    // because each handler could increment sequence num twice and max concurrent in-flight
-    // transactions is the number of RPC handlers.
-    // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
-    // changes touch same row key.
-    // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
-    // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
-    // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
-    return sequenceId.incrementAndGet() + 1000000000;
+  boolean tryAdvanceTo(long newStartPoint, long expected) {
+    synchronized (writeQueue) {
+      long currentRead = this.readPoint.get();
+      long currentWrite = this.writePoint.get();
+      if (currentRead != currentWrite) {
+        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
+          ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
+      }
+      if (expected != NONE && expected != currentRead) {
+        return false;
+      }
+
+      if (newStartPoint < currentRead) {
+        return false;
+      }
+
+      readPoint.set(newStartPoint);
+      writePoint.set(newStartPoint);
+    }
+    return true;
   }
 
   /**
-   * This function starts a MVCC transaction with current region's log change sequence number. Since
-   * we set change sequence number when flushing current change to WAL(late binding), the flush
-   * order may differ from the order to start a MVCC transaction. For example, a change begins a
-   * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
-   * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
-   * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
-   * big number is safe because we only need it to prevent current change being seen and the number
-   * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
-   * for MVCC to align with flush sequence.
-   * @param curSeqNum
-   * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
+   * to our queue of ongoing writes. Return this WriteEntry instance.
+   * To complete the write transaction and wait for it to be visible, call
+   * {@link #completeAndWait(WriteEntry)}. If the write failed, call
+   * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed write
+   * transaction.
+   * @see #complete(WriteEntry)
+   * @see #completeAndWait(WriteEntry)
    */
-  public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
-    WriteEntry e = new WriteEntry(curSeqNum);
+  public WriteEntry begin() {
     synchronized (writeQueue) {
+      long nextWriteNumber = writePoint.incrementAndGet();
+      WriteEntry e = new WriteEntry(nextWriteNumber);
       writeQueue.add(e);
       return e;
     }
   }
 
   /**
-   * Complete a {@link WriteEntry} that was created by
-   * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
-   * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
-   * visible to MVCC readers.
-   * @throws IOException
-   */
-  public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
-      throws IOException {
-    if(e == null) return;
-    if (seqId != null) {
-      e.setWriteNumber(seqId.getSequenceId());
-    } else {
-      // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
-      // function beginMemstoreInsertWithSeqNum in case of failures
-      e.setWriteNumber(NO_WRITE_NUMBER);
-    }
-    waitForPreviousTransactionsComplete(e);
-  }
-
-  /**
-   * Cancel a write insert that failed.
-   * Removes the write entry without advancing read point or without interfering with write
-   * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
-   * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
-   * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
-   * it as for special handling).
-   * @param writeEntry Failed attempt at write. Does cleanup.
+   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
+   * to complete.
    */
-  public void cancelMemstoreInsert(WriteEntry writeEntry) {
-    // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
-    // readpoint and gets my little writeEntry completed and removed from queue of outstanding
-    // events which seems right.  St.Ack 20150901.
-    writeEntry.setWriteNumber(NO_WRITE_NUMBER);
-    advanceMemstore(writeEntry);
+  public void await() {
+    // Add a write and then wait on reads to catch up to it.
+    completeAndWait(begin());
   }
 
   /**
-   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
-   * end of this call, the global read point is at least as large as the write point of the passed
-   * in WriteEntry. Thus, the write is visible to MVCC readers.
+   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
+   * read point catches up to our write.
+   *
+   * At the end of this call, the global read point is at least as large as the write point
+   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
    */
-  public void completeMemstoreInsert(WriteEntry e) {
-    waitForPreviousTransactionsComplete(e);
+  public void completeAndWait(WriteEntry e) {
+    complete(e);
+    waitForRead(e);
   }
 
   /**
-   * Mark the {@link WriteEntry} as complete and advance the read point as
-   * much as possible.
+   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
+   * Call this even if the write has FAILED (AFTER backing out the write transaction
+   * changes completely) so we can clean up the outstanding transaction.
    *
    * How much is the read point advanced?
-   * Let S be the set of all write numbers that are completed and where all previous write numbers
-   * are also completed.  Then, the read point is advanced to the supremum of S.
+   * 
+   * Let S be the set of all write numbers that are completed. Set the read point to the highest
+   * numbered write of S.
+   *
+   * @param writeEntry
    *
-   * @param e
    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
    */
-  boolean advanceMemstore(WriteEntry e) {
-    long nextReadValue = -1;
+  public boolean complete(WriteEntry writeEntry) {
     synchronized (writeQueue) {
-      e.markCompleted();
+      writeEntry.markCompleted();
 
+      long nextReadValue = NONE;
+      boolean ranOnce = false;
       while (!writeQueue.isEmpty()) {
+        ranOnce = true;
         WriteEntry queueFirst = writeQueue.getFirst();
+
+        if (nextReadValue > 0) {
+          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
+            throw new RuntimeException("Invariant in complete violated, nextReadValue="
+                + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
+          }
+        }
+
         if (queueFirst.isCompleted()) {
-          // Using Max because Edit complete in WAL sync order not arriving order
-          nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+          nextReadValue = queueFirst.getWriteNumber();
           writeQueue.removeFirst();
         } else {
           break;
         }
       }
 
-      if (nextReadValue > memstoreRead) {
-        memstoreRead = nextReadValue;
+      if (!ranOnce) {
+        throw new RuntimeException("There is no first!");
       }
 
-      // notify waiters on writeQueue before return
-      writeQueue.notifyAll();
-    }
-
-    if (nextReadValue > 0) {
-      synchronized (readWaiters) {
-        readWaiters.notifyAll();
-      }
-    }
-
-    if (memstoreRead >= e.getWriteNumber()) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Advances the current read point to be given seqNum if it is smaller than
-   * that.
-   */
-  void advanceMemstoreReadPointIfNeeded(long seqNum) {
-    synchronized (writeQueue) {
-      if (this.memstoreRead < seqNum) {
-        memstoreRead = seqNum;
+      if (nextReadValue > 0) {
+        synchronized (readWaiters) {
+          readPoint.set(nextReadValue);
+          readWaiters.notifyAll();
+        }
       }
+      return readPoint.get() >= writeEntry.getWriteNumber();
     }
   }
 
   /**
-   * Wait for all previous MVCC transactions complete
+   * Wait for the global readPoint to advance up to the passed in write entry number.
    */
-  public void waitForPreviousTransactionsComplete() {
-    WriteEntry w = beginMemstoreInsert();
-    waitForPreviousTransactionsComplete(w);
-  }
-
-  public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+  void waitForRead(WriteEntry e) {
     boolean interrupted = false;
-    WriteEntry w = waitedEntry;
-
-    try {
-      WriteEntry firstEntry = null;
-      do {
-        synchronized (writeQueue) {
-          // writeQueue won't be empty at this point, the following is just a safety check
-          if (writeQueue.isEmpty()) {
-            break;
-          }
-          firstEntry = writeQueue.getFirst();
-          if (firstEntry == w) {
-            // all previous in-flight transactions are done
-            break;
-          }
-          try {
-            writeQueue.wait(0);
-          } catch (InterruptedException ie) {
-            // We were interrupted... finish the loop -- i.e. cleanup --and then
-            // on our way out, reset the interrupt flag.
-            interrupted = true;
-            break;
-          }
+    synchronized (readWaiters) {
+      while (readPoint.get() < e.getWriteNumber()) {
+        try {
+          readWaiters.wait(0);
+        } catch (InterruptedException ie) {
+          // We were interrupted... finish the loop -- i.e. cleanup --and then
+          // on our way out, reset the interrupt flag.
+          interrupted = true;
         }
-      } while (firstEntry != null);
-    } finally {
-      if (w != null) {
-        advanceMemstore(w);
       }
     }
     if (interrupted) {
@@ -258,34 +217,43 @@ public class MultiVersionConcurrencyControl {
     }
   }
 
-  public long memstoreReadPoint() {
-    return memstoreRead;
+  public long getReadPoint() {
+    return readPoint.get();
   }
 
+  @VisibleForTesting
+  public long getWritePoint() {
+    return writePoint.get();
+  }
+
+  /**
+   * Write number and whether write has completed given out at start of a write transaction.
+   * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
+   */
+  @InterfaceAudience.Private
   public static class WriteEntry {
-    private long writeNumber;
-    private volatile boolean completed = false;
+    private final long writeNumber;
+    private boolean completed = false;
 
     WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
+
     void markCompleted() {
       this.completed = true;
     }
+
     boolean isCompleted() {
       return this.completed;
     }
-    long getWriteNumber() {
+
+    public long getWriteNumber() {
       return this.writeNumber;
     }
-    void setWriteNumber(long val){
-      this.writeNumber = val;
-    }
   }
 
   public static final long FIXED_SIZE = ClassSize.align(
       ClassSize.OBJECT +
       2 * Bytes.SIZEOF_LONG +
       2 * ClassSize.REFERENCE);
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 1c29827..0e4a585 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,7 +62,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -111,7 +109,7 @@ import com.lmax.disruptor.dsl.ProducerType;
  *
  * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
  * org.apache.hadoop.fs.Path)}.
- * 
+ *
  * <h2>Failure Semantic</h2>
  * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
  * any more appends or syncs will fail also with the same original exception. If we have made
@@ -141,7 +139,7 @@ public class FSHLog implements WAL {
   // Calls to append now also wait until the append has been done on the consumer side of the
   // disruptor.  We used to not wait but it makes the implemenation easier to grok if we have
   // the region edit/sequence id after the append returns.
-  // 
+  //
   // TODO: Handlers need to coordinate appending AND syncing.  Can we have the threads contend
   // once only?  Probably hard given syncs take way longer than an append.
   //
@@ -232,7 +230,7 @@ public class FSHLog implements WAL {
   private final String logFilePrefix;
 
   /**
-   * Suffix included on generated wal file names 
+   * Suffix included on generated wal file names
    */
   private final String logFileSuffix;
 
@@ -249,13 +247,14 @@ public class FSHLog implements WAL {
   protected final Configuration conf;
 
   /** Listeners that are called on WAL events. */
-  private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+  private final List<WALActionsListener> listeners =
+    new CopyOnWriteArrayList<WALActionsListener>();
 
   @Override
   public void registerWALActionsListener(final WALActionsListener listener) {
     this.listeners.add(listener);
   }
-  
+
   @Override
   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
     return this.listeners.remove(listener);
@@ -618,7 +617,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about pre log roll.
-   * @throws IOException 
+   * @throws IOException
    */
   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -631,7 +630,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about post log roll.
-   * @throws IOException 
+   * @throws IOException
    */
   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -1059,27 +1058,11 @@ public class FSHLog implements WAL {
     }
   }
 
-  /**
-   * @param now
-   * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
-   * @param tableName
-   * @param clusterIds that have consumed the change
-   * @return New log key.
-   */
-  @SuppressWarnings("deprecation")
-  protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
-      long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
-  }
-  
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
       justification="Will never be null")
   @Override
   public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
-      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
-      final List<Cell> memstoreCells) throws IOException {
+      final WALEdit edits, final boolean inMemstore) throws IOException {
     if (this.closed) throw new IOException("Cannot append; log is closed");
     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
     // single consuming thread.  Don't have to worry about it.
@@ -1093,9 +1076,9 @@ public class FSHLog implements WAL {
     try {
       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
       // Construction of FSWALEntry sets a latch.  The latch is thrown just after we stamp the
-      // edit with its edit/sequence id.  The below entry.getRegionSequenceId will wait on the
-      // latch to be thrown.  TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
-      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
+      // edit with its edit/sequence id.
+      // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
+      entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
       truck.loadPayload(entry, scope.detach());
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
@@ -1122,9 +1105,9 @@ public class FSHLog implements WAL {
     private volatile long sequence;
     // Keep around last exception thrown. Clear on successful sync.
     private final BlockingQueue<SyncFuture> syncFutures;
- 
+
     /**
-     * UPDATE! 
+     * UPDATE!
      * @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
      * we will put the result of the actual hdfs sync call as the result.
      * @param sequence The sequence number on the ring buffer when this thread was set running.
@@ -1172,7 +1155,7 @@ public class FSHLog implements WAL {
       // This function releases one sync future only.
       return 1;
     }
- 
+
     /**
      * Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
      * @param currentSequence
@@ -1604,7 +1587,7 @@ public class FSHLog implements WAL {
    * 'safe point' while the orchestrating thread does some work that requires the first thread
    * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
    * thread.
-   * 
+   *
    * <p>Thread A signals Thread B to hold when it gets to a 'safe point'.  Thread A wait until
    * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
    * Thread B then holds at the 'safe point'.  Thread A on notification that Thread B is paused,
@@ -1612,7 +1595,7 @@ public class FSHLog implements WAL {
    * it flags B and then Thread A and Thread B continue along on their merry way.  Pause and
    * signalling 'zigzags' between the two participating threads.  We use two latches -- one the
    * inverse of the other -- pausing and signaling when states are achieved.
-   * 
+   *
    * <p>To start up the drama, Thread A creates an instance of this class each time it would do
    * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
    * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
@@ -1634,7 +1617,7 @@ public class FSHLog implements WAL {
      * Latch to wait on.  Will be released when we can proceed.
      */
     private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
- 
+
     /**
      * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
      * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
@@ -1643,7 +1626,7 @@ public class FSHLog implements WAL {
      * @throws InterruptedException
      * @throws ExecutionException
      * @return The passed <code>syncFuture</code>
-     * @throws FailedSyncBeforeLogCloseException 
+     * @throws FailedSyncBeforeLogCloseException
      */
     SyncFuture waitSafePoint(final SyncFuture syncFuture)
     throws InterruptedException, FailedSyncBeforeLogCloseException {
@@ -1655,7 +1638,7 @@ public class FSHLog implements WAL {
       }
       return syncFuture;
     }
- 
+
     /**
      * Called by Thread B when it attains the 'safe point'.  In this method, Thread B signals
      * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
@@ -1893,9 +1876,8 @@ public class FSHLog implements WAL {
         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
         // so region sequenceids will also be in order.
         regionSequenceId = entry.stampRegionSequenceId();
-
-        // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a 
-        // region sequence id only, a region edit/sequence id that is not associated with an actual 
+        // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a
+        // region sequence id only, a region edit/sequence id that is not associated with an actual
         // edit. It has to go through all the rigmarole to be sure we have the right ordering.
         if (entry.getEdit().isEmpty()) {
           return;
@@ -2053,4 +2035,4 @@ public class FSHLog implements WAL {
     }
     return new DatanodeInfo[0];
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index a768660..7f3eb61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 
@@ -51,23 +50,18 @@ class FSWALEntry extends Entry {
   // The below data members are denoted 'transient' just to highlight these are not persisted;
   // they are only in memory and held here while passing over the ring buffer.
   private final transient long sequence;
-  private final transient AtomicLong regionSequenceIdReference;
   private final transient boolean inMemstore;
   private final transient HTableDescriptor htd;
   private final transient HRegionInfo hri;
-  private final transient List<Cell> memstoreCells;
   private final Set<byte[]> familyNames;
 
   FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
-      final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
-      final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
+      final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
     super(key, edit);
-    this.regionSequenceIdReference = referenceToRegionSequenceId;
     this.inMemstore = inMemstore;
     this.htd = htd;
     this.hri = hri;
     this.sequence = sequence;
-    this.memstoreCells = memstoreCells;
     if (inMemstore) {
       // construct familyNames here to reduce the work of log sinker.
       ArrayList<Cell> cells = this.getEdit().getCells();
@@ -111,24 +105,30 @@ class FSWALEntry extends Entry {
   }
 
   /**
-   * Stamp this edit with a region edit/sequence id.
-   * Call when safe to do so: i.e. the context is such that the increment on the passed in
-   * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
-   * WAL.  This method works with {@link #getRegionSequenceId()}.  It will block waiting on this
-   * method to be called.
-   * @return The region edit/sequence id we set for this edit.
+   * Here is where a WAL edit gets its sequenceid.
+   * @return The sequenceid we stamped on this edit.
    * @throws IOException
-   * @see #getRegionSequenceId()
    */
   long stampRegionSequenceId() throws IOException {
-    long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
-      for (Cell cell : this.memstoreCells) {
-        CellUtil.setSequenceId(cell, regionSequenceId);
+    long regionSequenceId = WALKey.NO_SEQUENCE_ID;
+    MultiVersionConcurrencyControl mvcc = getKey().getMvcc();
+    MultiVersionConcurrencyControl.WriteEntry we = null;
+
+    if (mvcc != null) {
+      we = mvcc.begin();
+      regionSequenceId = we.getWriteNumber();
+    }
+
+    if (!this.getEdit().isReplay() && inMemstore) {
+      for (Cell c:getEdit().getCells()) {
+        CellUtil.setSequenceId(c, regionSequenceId);
       }
     }
+
+    // This has to stay in this order
     WALKey key = getKey();
     key.setLogSeqNum(regionSequenceId);
+    key.setWriteEntry(we);
     return regionSequenceId;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 5218981..1302d8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.io.Writable;
@@ -73,6 +74,13 @@ public class HLogKey extends WALKey implements Writable {
     super(encodedRegionName, tablename, now);
   }
 
+  public HLogKey(final byte[] encodedRegionName,
+                 final TableName tablename,
+                 final long now,
+                 final MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, mvcc);
+  }
+
   /**
    * Create the log key for writing to somewhere.
    * We maintain the tablename mainly for debugging purposes.
@@ -86,9 +94,16 @@ public class HLogKey extends WALKey implements Writable {
    * @param now Time at which this edit was written.
    * @param clusterIds the clusters that have consumed the change(used in Replication)
    */
-  public HLogKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+  public HLogKey(
+      final byte[] encodedRegionName,
+      final TableName tablename,
+      long logSeqNum,
+      final long now,
+      List<UUID> clusterIds,
+      long nonceGroup,
+      long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -104,9 +119,14 @@ public class HLogKey extends WALKey implements Writable {
    * @param nonceGroup
    * @param nonce
    */
-  public HLogKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+  public HLogKey(final byte[] encodedRegionName,
+                 final TableName tablename,
+                 final long now,
+                 List<UUID> clusterIds,
+                 long nonceGroup,
+                 long nonce,
+                 final MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -122,8 +142,8 @@ public class HLogKey extends WALKey implements Writable {
    * @param nonce
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
-      long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce);
+      long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -141,7 +161,8 @@ public class HLogKey extends WALKey implements Writable {
       Compressor.writeCompressed(this.encodedRegionName, 0,
           this.encodedRegionName.length, out,
           compressionContext.regionDict);
-      Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
+      Compressor.writeCompressed(this.tablename.getName(), 0,
+          this.tablename.getName().length, out,
           compressionContext.tableDict);
     }
     out.writeLong(this.logSeqNum);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
index cb89346..f7ae208 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReplayHLogKey.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 
 /**
  * An HLogKey specific to WalEdits coming from replay.
@@ -32,13 +33,15 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class ReplayHLogKey extends HLogKey {
 
   public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
+      final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+      MultiVersionConcurrencyControl mvcc) {
+    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index be39873..e41e1c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.Text;
 
-@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
+  HBaseInterfaceAudience.CONFIG})
 public class SequenceFileLogReader extends ReaderBase {
   private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
 
@@ -273,8 +274,10 @@ public class SequenceFileLogReader extends ReaderBase {
       end = fEnd.getLong(this.reader);
     } catch(NoSuchFieldException nfe) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(nfe);
     } catch(IllegalAccessException iae) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(iae);
     } catch(Exception e) {
        /* All other cases. Should we handle it more aggressively? */
        LOG.warn("Unexpected exception when accessing the end field", e);
@@ -293,8 +296,10 @@ public class SequenceFileLogReader extends ReaderBase {
         .initCause(ioe);
     } catch(NoSuchMethodException nfe) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(nfe);
     } catch(IllegalAccessException iae) {
        /* reflection failure, keep going */
+      if (LOG.isTraceEnabled()) LOG.trace(iae);
     } catch(Exception e) {
        /* All other cases. Should we handle it more aggressively? */
        LOG.warn("Unexpected exception when accessing the end field", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index a752ff1..3b774ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -73,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
  * where, the WALEdit is serialized as:
  *   &lt;-1, # of edits, &lt;KeyValue&gt;, &lt;KeyValue&gt;, ... &gt;
  * For example:
- *   &lt;-1, 3, &lt;Keyvalue-for-edit-c1&gt;, &lt;KeyValue-for-edit-c2&gt;, &lt;KeyValue-for-edit-c3&gt;&gt;
+ *   &lt;-1, 3, &lt;KV-for-edit-c1&gt;, &lt;KV-for-edit-c2&gt;, &lt;KV-for-edit-c3&gt;&gt;
  *
  * The -1 marker is just a special way of being backward compatible with
  * an old WAL which would have contained a single &lt;KeyValue&gt;.
@@ -104,6 +104,9 @@ public class WALEdit implements Writable, HeapSize {
   public static final WALEdit EMPTY_WALEDIT = new WALEdit();
 
   // Only here for legacy writable deserialization
+  /**
+   * @deprecated Legacy
+   */
   @Deprecated
   private NavigableMap<byte[], Integer> scopes;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 399623f..2718295 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,12 +20,9 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -34,6 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 
@@ -48,19 +46,27 @@ import com.google.protobuf.TextFormat;
 public class WALUtil {
   private static final Log LOG = LogFactory.getLog(WALUtil.class);
 
+  private WALUtil() {
+    // Shut down construction of this class.
+  }
+
   /**
    * Write the marker that a compaction has succeeded and is about to be committed.
    * This provides info to the HMaster to allow it to recover the compaction if
    * this regionserver dies in the middle (This part is not yet implemented). It also prevents
    * the compaction from finishing if this regionserver has already lost its lease on the log.
-   * @param sequenceId Used by WAL to get sequence Id for the waledit.
+   * @param mvcc Used by WAL to get sequence Id for the waledit.
    */
-  public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
+  public static void writeCompactionMarker(WAL log,
+                                           HTableDescriptor htd,
+                                           HRegionInfo info,
+                                           final CompactionDescriptor c,
+                                           MultiVersionConcurrencyControl mvcc) throws IOException {
     TableName tn = TableName.valueOf(c.getTableName().toByteArray());
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
+    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+    log.append(htd, info, key, WALEdit.createCompaction(info, c), false);
+    mvcc.complete(key.getWriteEntry());
     log.sync();
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
@@ -70,13 +76,17 @@ public class WALUtil {
   /**
    * Write a flush marker indicating a start / abort or a complete of a region flush
    */
-  public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
+  public static long writeFlushMarker(WAL log,
+                                      HTableDescriptor htd,
+                                      HRegionInfo info,
+                                      final FlushDescriptor f,
+                                      boolean sync,
+                                      MultiVersionConcurrencyControl mvcc) throws IOException {
     TableName tn = TableName.valueOf(f.getTableName().toByteArray());
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
-        null);
+    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn, System.currentTimeMillis(), mvcc);
+    long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), false);
+    mvcc.complete(key.getWriteEntry());
     if (sync) log.sync(trx);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
@@ -88,12 +98,11 @@ public class WALUtil {
    * Write a region open marker indicating that the region is opened
    */
   public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
-      final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
+      final RegionEventDescriptor r) throws IOException {
     TableName tn = TableName.valueOf(r.getTableName().toByteArray());
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
-      sequenceId, false, null);
+    long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r), false);
     log.sync(trx);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
@@ -108,27 +117,22 @@ public class WALUtil {
    * @param htd        A description of the table that we are bulk loading into.
    * @param info       A description of the region in the table that we are bulk loading into.
    * @param descriptor A protocol buffers based description of the client's bulk loading request
-   * @param sequenceId The current sequenceId in the log at the time when we were to write the
-   *                   bulk load marker.
    * @return txid of this transaction or if nothing to do, the last txid
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
   public static long writeBulkLoadMarkerAndSync(final WAL wal,
                                                 final HTableDescriptor htd,
                                                 final HRegionInfo info,
-                                                final WALProtos.BulkLoadDescriptor descriptor,
-                                                final AtomicLong sequenceId) throws IOException {
+                                                final WALProtos.BulkLoadDescriptor descriptor)
+      throws IOException {
     TableName tn = info.getTable();
     WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
 
     // Add it to the log but the false specifies that we don't need to add it to the memstore
     long trx = wal.append(htd,
-            info,
-            key,
-            WALEdit.createBulkLoadEvent(info, descriptor),
-            sequenceId,
-            false,
-            new ArrayList<Cell>());
+        info,
+        key,
+        WALEdit.createBulkLoadEvent(info, descriptor), false);
     wal.sync(trx);
 
     if (LOG.isTraceEnabled()) {
@@ -136,5 +140,4 @@ public class WALUtil {
     }
     return trx;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
index f628cee..84d6128 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java
@@ -55,7 +55,7 @@ public class HashedBytes {
     if (obj == null || getClass() != obj.getClass())
       return false;
     HashedBytes other = (HashedBytes) obj;
-    return Arrays.equals(bytes, other.bytes);
+    return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 04045ec..191d546 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -156,7 +156,7 @@ class DisabledWALProvider implements WALProvider {
 
     @Override
     public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-        AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
+                       boolean inMemstore) {
       if (!this.listeners.isEmpty()) {
         final long start = System.nanoTime();
         long len = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 4844487..ce34c98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -114,19 +114,16 @@ public interface WAL {
    * @param key Modified by this call; we add to it this edits region edit/sequence id.
    * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
    * sequence id that is after all currently appended edits.
-   * @param htd used to give scope for replication TODO refactor out in favor of table name and info
-   * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
-   * source of its incrementing edits sequence id.  Inside in this call we will increment it and
-   * attach the sequence to the edit we apply the WAL.
+   * @param htd used to give scope for replication TODO refactor out in favor of table name and
+   * info
    * @param inMemstore Always true except for case where we are writing a compaction completion
    * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
    * -- it is not an edit for memstore.
-   * @param memstoreKVs list of KVs added into memstore
    * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
    * in it.
    */
   long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-      AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
+    boolean inMemstore)
   throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 74284e0..48ede4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -68,13 +69,47 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
  *
  * Note that protected members marked @InterfaceAudience.Private are only protected
  * to support the legacy HLogKey class, which is in a different package.
+ * 
+ * <p>
  */
 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
 //       purposes. They need to be merged into WALEntry.
+// TODO: Cleanup. We have logSeqNum and then WriteEntry, both are sequence id'ing. Fix.
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class WALKey implements SequenceId, Comparable<WALKey> {
   private static final Log LOG = LogFactory.getLog(WALKey.class);
 
+  @InterfaceAudience.Private // For internal use only.
+  public MultiVersionConcurrencyControl getMvcc() {
+    return mvcc;
+  }
+
+  /**
+   * Will block until a write entry has been assigned by they WAL subsystem.
+   * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling
+   * mvcc#complete or mvcc#completeAndWait.
+   * @throws InterruptedIOException
+   * @see
+   * #setWriteEntry(org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry)
+   */
+  @InterfaceAudience.Private // For internal use only.
+  public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
+    try {
+      this.seqNumAssignedLatch.await();
+    } catch (InterruptedException ie) {
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      throw iie;
+    }
+    return writeEntry;
+  }
+
+  @InterfaceAudience.Private // For internal use only.
+  public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
+    this.writeEntry = writeEntry;
+    this.seqNumAssignedLatch.countDown();
+  }
+
   // should be < 0 (@see HLogKey#readFields(DataInput))
   // version 2 supports WAL compression
   // public members here are only public because of HLogKey
@@ -151,7 +186,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
 
   private long nonceGroup = HConstants.NO_NONCE;
   private long nonce = HConstants.NO_NONCE;
-  static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
+  private MultiVersionConcurrencyControl mvcc;
+  private MultiVersionConcurrencyControl.WriteEntry writeEntry;
+  public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
 
   // visible for deprecated HLogKey
   @InterfaceAudience.Private
@@ -159,16 +196,17 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
 
   public WALKey() {
     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
-        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
+        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
   }
 
   @VisibleForTesting
-  public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+                long logSeqNum,
       final long now, UUID clusterId) {
     List<UUID> clusterIds = new ArrayList<UUID>();
     clusterIds.add(clusterId);
     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
-        HConstants.NO_NONCE, HConstants.NO_NONCE);
+        HConstants.NO_NONCE, HConstants.NO_NONCE, null);
   }
 
   public WALKey(final byte[] encodedRegionName, final TableName tablename) {
@@ -176,8 +214,28 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   }
 
   public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
-    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
-        EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    init(encodedRegionName,
+        tablename,
+        NO_SEQUENCE_ID,
+        now,
+        EMPTY_UUIDS,
+        HConstants.NO_NONCE,
+        HConstants.NO_NONCE,
+        null);
+  }
+
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                final long now,
+                MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName,
+        tablename,
+        NO_SEQUENCE_ID,
+        now,
+        EMPTY_UUIDS,
+        HConstants.NO_NONCE,
+        HConstants.NO_NONCE,
+        mvcc);
   }
 
   /**
@@ -187,15 +245,21 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * <p>Used by log splitting and snapshots.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
-   * @param tablename   - name of table
-   * @param logSeqNum   - log sequence number
-   * @param now Time at which this edit was written.
-   * @param clusterIds the clusters that have consumed the change(used in Replication)
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tablename         - name of table
+   * @param logSeqNum         - log sequence number
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                long logSeqNum,
+                final long now,
+                List<UUID> clusterIds,
+                long nonceGroup,
+                long nonce,
+                MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -204,17 +268,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * A regionName is always a sub-table object.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tablename
-   * @param now Time at which this edit was written.
-   * @param clusterIds the clusters that have consumed the change(used in Replication)
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
    * @param nonceGroup
    * @param nonce
+   * @param mvcc mvcc control used to generate sequence numbers and control read/write points
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename,
-      final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
-      nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+                final long now, List<UUID> clusterIds, long nonceGroup,
+                final long nonce, final MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
   }
 
   /**
@@ -223,21 +288,37 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    * A regionName is always a sub-table object.
    *
    * @param encodedRegionName Encoded name of the region as returned by
-   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tablename
    * @param logSeqNum
    * @param nonceGroup
    * @param nonce
    */
-  public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
-      long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
-      EMPTY_UUIDS, nonceGroup, nonce);
+  public WALKey(final byte[] encodedRegionName,
+                final TableName tablename,
+                long logSeqNum,
+                long nonceGroup,
+                long nonce,
+                final MultiVersionConcurrencyControl mvcc) {
+    init(encodedRegionName,
+        tablename,
+        logSeqNum,
+        EnvironmentEdgeManager.currentTime(),
+        EMPTY_UUIDS,
+        nonceGroup,
+        nonce,
+        mvcc);
   }
 
   @InterfaceAudience.Private
-  protected void init(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+  protected void init(final byte[] encodedRegionName,
+                      final TableName tablename,
+                      long logSeqNum,
+                      final long now,
+                      List<UUID> clusterIds,
+                      long nonceGroup,
+                      long nonce,
+                      MultiVersionConcurrencyControl mvcc) {
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterIds = clusterIds;
@@ -245,6 +326,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     this.tablename = tablename;
     this.nonceGroup = nonceGroup;
     this.nonce = nonce;
+    this.mvcc = mvcc;
   }
 
   /**
@@ -270,15 +352,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   }
 
   /**
-   * Allow that the log sequence id to be set post-construction and release all waiters on assigned
-   * sequence number.
+   * Allow that the log sequence id to be set post-construction
    * Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry
    * @param sequence
    */
   @InterfaceAudience.Private
   public void setLogSeqNum(final long sequence) {
     this.logSeqNum = sequence;
-    this.seqNumAssignedLatch.countDown();
+
   }
 
   /**
@@ -492,21 +573,22 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     this.encodedRegionName = encodedRegionName;
   }
 
-  public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
-  throws IOException {
-    org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
+  public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
+      WALCellCodec.ByteStringCompressor compressor) throws IOException {
+    org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
+        org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
     if (compressionContext == null) {
       builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
       builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
     } else {
       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
-        compressionContext.regionDict));
+          compressionContext.regionDict));
       builder.setTableName(compressor.compress(this.tablename.getName(),
-        compressionContext.tableDict));
+          compressionContext.tableDict));
     }
     builder.setLogSequenceNumber(this.logSeqNum);
     builder.setWriteTime(writeTime);
-    if(this.origLogSeqNum > 0) {
+    if (this.origLogSeqNum > 0) {
       builder.setOrigSequenceNumber(this.origLogSeqNum);
     }
     if (this.nonce != HConstants.NO_NONCE) {
@@ -532,8 +614,9 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     return builder;
   }
 
-  public void readFieldsFromPb(
-      org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+  public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
+                               WALCellCodec.ByteStringUncompressor uncompressor)
+      throws IOException {
     if (this.compressionContext != null) {
       this.encodedRegionName = uncompressor.uncompress(
           walKey.getEncodedRegionName(), compressionContext.regionDict);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 51043e4..3741cdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2301,7 +2301,7 @@ public class WALSplitter {
       // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
       key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
               walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
-              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
       logEntry.setFirst(key);
       logEntry.setSecond(val);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
index 7fd8902..1665e66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 6223b15..bb216b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,12 +45,12 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -225,8 +224,7 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompaction() throws Exception {
-    doTest(BlockCompactionsInPrepRegion.class, false);
-    doTest(BlockCompactionsInPrepRegion.class, true);
+    doTest(BlockCompactionsInPrepRegion.class);
   }
 
   /**
@@ -237,13 +235,11 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompactionAfterWALSync() throws Exception {
-    doTest(BlockCompactionsInCompletionRegion.class, false);
-    doTest(BlockCompactionsInCompletionRegion.class, true);
+    doTest(BlockCompactionsInCompletionRegion.class);
   }
 
-  public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
+  public void doTest(Class<?> regionClass) throws Exception {
     Configuration c = TEST_UTIL.getConfiguration();
-    c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
     // Insert our custom region
     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
     c.setBoolean("dfs.support.append", true);
@@ -285,7 +281,7 @@ public class TestIOFencing {
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
       WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
-        oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+        oldHri, compactionDescriptor, compactingRegion.getMVCC());
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = System.currentTimeMillis();
@@ -356,4 +352,4 @@ public class TestIOFencing {
       TEST_UTIL.shutdownMiniCluster();
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 37e98e8..a064bcc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -376,7 +376,6 @@ public class TestReplicasClient {
     }
   }
 
-
   @Test
   public void testFlushTable() throws Exception {
     openRegion(hriSecondary);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
index 78deed9..0a4ca16 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
@@ -141,6 +141,4 @@ public class TestRegionObserverStacking extends TestCase {
     assertTrue(idB < idC);
     HBaseTestingUtility.closeRegionAndWAL(region);
   }
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 8993255..7772664 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,7 +29,6 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -188,7 +188,6 @@ public class TestWALObserver {
     Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
     deleteDir(basedir);
     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
-    final AtomicLong sequenceId = new AtomicLong(0);
 
     // TEST_FAMILY[0] shall be removed from WALEdit.
     // TEST_FAMILY[1] value shall be changed.
@@ -237,7 +236,7 @@ public class TestWALObserver {
     long now = EnvironmentEdgeManager.currentTime();
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
-        edit, sequenceId, true, null);
+        edit, true);
     log.sync(txid);
 
     // the edit shall have been change now by the coprocessor.
@@ -273,7 +272,7 @@ public class TestWALObserver {
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
         .toString(TEST_TABLE));
     final HRegionInfo hri = new HRegionInfo(tableName, null, null);
-    final AtomicLong sequenceId = new AtomicLong(0);
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
 
     fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
 
@@ -300,7 +299,7 @@ public class TestWALObserver {
     final int countPerFamily = 5;
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
     }
 
     LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -324,7 +323,7 @@ public class TestWALObserver {
     final WALEdit edit = new WALEdit();
     final byte[] nonce = Bytes.toBytes("1772");
     edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
-    final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+    final long txid = wal.append(htd, hri, legacyKey, edit, true);
     wal.sync(txid);
 
     LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +348,7 @@ public class TestWALObserver {
   public void testEmptyWALEditAreNotSeen() throws Exception {
     final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
-    final AtomicLong sequenceId = new AtomicLong(0);
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
 
     WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
     try {
@@ -361,8 +360,9 @@ public class TestWALObserver {
       assertFalse(cp.isPostWALWriteCalled());
 
       final long now = EnvironmentEdgeManager.currentTime();
-      long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
-          new WALEdit(), sequenceId, true, null);
+      long txid = log.append(htd, hri,
+          new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+          new WALEdit(), true);
       log.sync(txid);
 
       assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
@@ -381,7 +381,7 @@ public class TestWALObserver {
     // ultimately called by HRegion::initialize()
     TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
     final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
-    final AtomicLong sequenceId = new AtomicLong(0);
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     // final HRegionInfo hri =
     // createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
     // final HRegionInfo hri1 =
@@ -405,10 +405,9 @@ public class TestWALObserver {
     // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
     }
-    wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
-        true, null);
+    wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
     // sync to fs.
     wal.sync();
 
@@ -528,7 +527,7 @@ public class TestWALObserver {
 
   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
-      final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
+      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
     String familyStr = Bytes.toString(family);
     long txid = -1;
     for (int j = 0; j < count; j++) {
@@ -539,7 +538,7 @@ public class TestWALObserver {
       // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
       // about legacy coprocessors
       txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
-          ee.currentTime()), edit, sequenceId, true, null);
+          ee.currentTime(), mvcc), edit, true);
     }
     if (-1 != txid) {
       wal.sync(txid);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index d82f36b..5fa588b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -32,8 +32,8 @@ import org.junit.experimental.categories.Category;
 public class TestHLogRecordReader extends TestWALRecordReader {
 
   @Override
-  protected WALKey getWalKey(final long sequenceid) {
-    return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+  protected WALKey getWalKey(final long time) {
+    return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c83d8a2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 64ef8fd..a4381c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -75,6 +76,7 @@ public class TestWALRecordReader {
   private static final byte [] value = Bytes.toBytes("value");
   private static HTableDescriptor htd;
   private static Path logDir;
+  protected MultiVersionConcurrencyControl mvcc;
 
   private static String getName() {
     return "TestWALRecordReader";
@@ -82,6 +84,7 @@ public class TestWALRecordReader {
 
   @Before
   public void setUp() throws Exception {
+    mvcc = new MultiVersionConcurrencyControl();
     FileStatus[] entries = fs.listStatus(hbaseDir);
     for (FileStatus dir : entries) {
       fs.delete(dir.getPath(), true);
@@ -124,13 +127,11 @@ public class TestWALRecordReader {
     // being millisecond based.
     long ts = System.currentTimeMillis();
     WALEdit edit = new WALEdit();
-    final AtomicLong sequenceId = new AtomicLong(0);
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
-    log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+    log.append(htd, info, getWalKey(ts), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
-    log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts+1), edit, true);
     log.sync();
     LOG.info("Before 1st WAL roll " + log.toString());
     log.rollWriter();
@@ -141,12 +142,10 @@ public class TestWALRecordReader {
 
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
-    log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts1+1), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
-    log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
-        true, null);
+    log.append(htd, info, getWalKey(ts1+2), edit, true);
     log.sync();
     log.shutdown();
     walfactory.shutdown();
@@ -188,8 +187,7 @@ public class TestWALRecordReader {
     WALEdit edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
         System.currentTimeMillis(), value));
-    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
-        null);
+    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
     log.sync(txid);
 
     Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -199,8 +197,7 @@ public class TestWALRecordReader {
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
         System.currentTimeMillis(), value));
-    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
-        null);
+    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
     log.sync(txid);
     log.shutdown();
     walfactory.shutdown();
@@ -239,8 +236,8 @@ public class TestWALRecordReader {
     testSplit(splits.get(1));
   }
 
-  protected WALKey getWalKey(final long sequenceid) {
-    return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+  protected WALKey getWalKey(final long time) {
+    return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
   }
 
   protected WALRecordReader getReader() {


Mime
View raw message