hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-14273 Rename MVCC to MVCC: From MultiVersionConsistencyControl to MultiVersionConcurrencyControl (Lars Francke)
Date Mon, 24 Aug 2015 15:47:28 GMT
Repository: hbase
Updated Branches:
  refs/heads/master eb52529c0 -> 9334a47d4


HBASE-14273 Rename MVCC to MVCC: From MultiVersionConsistencyControl to MultiVersionConcurrencyControl
(Lars Francke)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9334a47d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9334a47d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9334a47d

Branch: refs/heads/master
Commit: 9334a47d4570f8adfc003f0fb2c5969a88c3bba0
Parents: eb52529
Author: stack <stack@apache.org>
Authored: Mon Aug 24 08:47:26 2015 -0700
Committer: stack <stack@apache.org>
Committed: Mon Aug 24 08:47:26 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  26 +-
 .../MultiVersionConcurrencyControl.java         | 273 +++++++++++++++++++
 .../MultiVersionConsistencyControl.java         | 273 -------------------
 .../hbase/regionserver/RegionScanner.java       |   4 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |   2 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  16 +-
 .../TestMultiVersionConcurrencyControl.java     | 135 +++++++++
 .../TestMultiVersionConsistencyControl.java     | 135 ---------
 src/main/asciidoc/_chapters/architecture.adoc   |   2 +-
 9 files changed, 433 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index df8bcf5..2293311 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -146,7 +146,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -584,8 +584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   private boolean splitRequest;
   private byte[] explicitSplitPoint = null;
 
-  private final MultiVersionConsistencyControl mvcc =
-      new MultiVersionConsistencyControl();
+  private final MultiVersionConcurrencyControl mvcc =
+      new MultiVersionConcurrencyControl();
 
   // Coprocessor host
   private RegionCoprocessorHost coprocessorHost;
@@ -1252,7 +1252,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     }
   }
 
-   public MultiVersionConsistencyControl getMVCC() {
+   public MultiVersionConcurrencyControl getMVCC() {
      return mvcc;
    }
 
@@ -2081,7 +2081,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     if (this.memstoreSize.get() <= 0) {
       // Take an update lock because am about to change the sequence id and we want the sequence
id
       // to be at the border of the empty memstore.
-      MultiVersionConsistencyControl.WriteEntry w = null;
+      MultiVersionConcurrencyControl.WriteEntry w = null;
       this.updatesLock.writeLock().lock();
       try {
         if (this.memstoreSize.get() <= 0) {
@@ -2137,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     // to do this for a moment.  It is quick. We also set the memstore size to zero here
before we
     // allow updates again so its value will represent the size of the updates received
     // during flush
-    MultiVersionConsistencyControl.WriteEntry w = null;
+    MultiVersionConcurrencyControl.WriteEntry w = null;
     // We have to take an update lock during snapshot, or else a write could end up in both
snapshot
     // and memstore (makes it difficult to do atomic rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
@@ -2853,7 +2853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
     WALEdit walEdit = new WALEdit(isInReplay);
-    MultiVersionConsistencyControl.WriteEntry w = null;
+    MultiVersionConcurrencyControl.WriteEntry w = null;
     long txid = 0;
     boolean doRollBackMemstore = false;
     boolean locked = false;
@@ -3000,7 +3000,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       if(isInReplay) {
         mvccNum = batchOp.getReplaySequenceId();
       } else {
-        mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+        mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
       }
       //
       // ------------------------------------
@@ -6635,7 +6635,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       return;
     }
 
-    MultiVersionConsistencyControl.WriteEntry writeEntry = null;
+    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
     boolean locked;
     boolean walSyncSuccessful = false;
     List<RowLock> acquiredRowLocks;
@@ -6656,7 +6656,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
       locked = true;
       // Get a mvcc write number
-      mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+      mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
 
       long now = EnvironmentEdgeManager.currentTime();
       try {
@@ -6853,7 +6853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
             }
           }
           // now start my own transaction
-          mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+          mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTime();
           // Process each family
@@ -7106,7 +7106,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
             }
           }
           // now start my own transaction
-          mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+          mvccNum = MultiVersionConcurrencyControl.getPreAssignedWriteNumber(this.sequenceId);
           w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTime();
           // Process each family
@@ -7332,7 +7332,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       WriteState.HEAP_SIZE + // writestate
       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
-      MultiVersionConsistencyControl.FIXED_SIZE // mvcc
+      MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
       + ClassSize.TREEMAP // maxSeqIdInStores
       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
       ;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
new file mode 100644
index 0000000..028d81a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+@InterfaceAudience.Private
+public class MultiVersionConcurrencyControl {
+  private static final long NO_WRITE_NUMBER = 0;
+  private volatile long memstoreRead = 0;
+  private final Object readWaiters = new Object();
+
+  // This is the pending queue of writes.
+  private final LinkedList<WriteEntry> writeQueue =
+      new LinkedList<WriteEntry>();
+
+  /**
+   * Default constructor. Initializes the memstoreRead/Write points to 0.
+   */
+  public MultiVersionConcurrencyControl() {
+  }
+
+  /**
+   * Initializes the memstoreRead/Write points appropriately.
+   * @param startPoint
+   */
+  public void initialize(long startPoint) {
+    synchronized (writeQueue) {
+      writeQueue.clear();
+      memstoreRead = startPoint;
+    }
+  }
+
+  /**
+   *
+   * @param initVal The value we used initially and expected it'll be reset later
+   * @return WriteEntry instance.
+   */
+  WriteEntry beginMemstoreInsert() {
+    return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+  }
+
+  /**
+   * Get a mvcc write number before an actual one(its log sequence Id) being assigned
+   * @param sequenceId
+   * @return long a faked write number which is bigger enough not to be seen by others before
a real
+   *         one is assigned
+   */
+  public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
+    // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
+    // current MVCC completes. Theoretically the bump only needs to be 2 * the number of
handlers
+    // because each handler could increment sequence num twice and max concurrent in-flight
+    // transactions is the number of RPC handlers.
+    // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
+    // changes touch same row key
+    // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
+    // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
+    return sequenceId.incrementAndGet() + 1000000000;
+  }
+
+  /**
+   * This function starts a MVCC transaction with current region's log change sequence number.
Since
+   * we set change sequence number when flushing current change to WAL(late binding), the
flush
+   * order may differ from the order to start a MVCC transaction. For example, a change begins
a
+   * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore,
we
+   * add a safe bumper to the passed in sequence number to start a MVCC so that no other
concurrent
+   * transactions will reuse the number till current MVCC completes(success or fail). The
"faked"
+   * big number is safe because we only need it to prevent current change being seen and
the number
+   * will be reset to real sequence number(set in log sync) right before we complete a MVCC
in order
+   * for MVCC to align with flush sequence.
+   * @param curSeqNum
+   * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+   */
+  public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
+    WriteEntry e = new WriteEntry(curSeqNum);
+    synchronized (writeQueue) {
+      writeQueue.add(e);
+      return e;
+    }
+  }
+
+  /**
+   * Complete a {@link WriteEntry} that was created by
+   * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
+   * point is at least as large as the write point of the passed in WriteEntry. Thus, the
write is
+   * visible to MVCC readers.
+   * @throws IOException
+   */
+  public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
+      throws IOException {
+    if(e == null) return;
+    if (seqId != null) {
+      e.setWriteNumber(seqId.getSequenceId());
+    } else {
+      // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
+      // function beginMemstoreInsertWithSeqNum in case of failures
+      e.setWriteNumber(NO_WRITE_NUMBER);
+    }
+    waitForPreviousTransactionsComplete(e);
+  }
+
+  /**
+   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At
the
+   * end of this call, the global read point is at least as large as the write point of the
passed
+   * in WriteEntry. Thus, the write is visible to MVCC readers.
+   */
+  public void completeMemstoreInsert(WriteEntry e) {
+    waitForPreviousTransactionsComplete(e);
+  }
+
+  /**
+   * Mark the {@link WriteEntry} as complete and advance the read point as
+   * much as possible.
+   *
+   * How much is the read point advanced?
+   * Let S be the set of all write numbers that are completed and where all previous write
numbers
+   * are also completed.  Then, the read point is advanced to the supremum of S.
+   *
+   * @param e
+   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
+   */
+  boolean advanceMemstore(WriteEntry e) {
+    long nextReadValue = -1;
+    synchronized (writeQueue) {
+      e.markCompleted();
+
+      while (!writeQueue.isEmpty()) {
+        WriteEntry queueFirst = writeQueue.getFirst();
+        if (queueFirst.isCompleted()) {
+          // Using Max because Edit complete in WAL sync order not arriving order
+          nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
+          writeQueue.removeFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (nextReadValue > memstoreRead) {
+        memstoreRead = nextReadValue;
+      }
+
+      // notify waiters on writeQueue before return
+      writeQueue.notifyAll();
+    }
+
+    if (nextReadValue > 0) {
+      synchronized (readWaiters) {
+        readWaiters.notifyAll();
+      }
+    }
+
+    if (memstoreRead >= e.getWriteNumber()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Advances the current read point to be given seqNum if it is smaller than
+   * that.
+   */
+  void advanceMemstoreReadPointIfNeeded(long seqNum) {
+    synchronized (writeQueue) {
+      if (this.memstoreRead < seqNum) {
+        memstoreRead = seqNum;
+      }
+    }
+  }
+
+  /**
+   * Wait for all previous MVCC transactions complete
+   */
+  public void waitForPreviousTransactionsComplete() {
+    WriteEntry w = beginMemstoreInsert();
+    waitForPreviousTransactionsComplete(w);
+  }
+
+  public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
+    boolean interrupted = false;
+    WriteEntry w = waitedEntry;
+
+    try {
+      WriteEntry firstEntry = null;
+      do {
+        synchronized (writeQueue) {
+          // writeQueue won't be empty at this point, the following is just a safety check
+          if (writeQueue.isEmpty()) {
+            break;
+          }
+          firstEntry = writeQueue.getFirst();
+          if (firstEntry == w) {
+            // all previous in-flight transactions are done
+            break;
+          }
+          try {
+            writeQueue.wait(0);
+          } catch (InterruptedException ie) {
+            // We were interrupted... finish the loop -- i.e. cleanup --and then
+            // on our way out, reset the interrupt flag.
+            interrupted = true;
+            break;
+          }
+        }
+      } while (firstEntry != null);
+    } finally {
+      if (w != null) {
+        advanceMemstore(w);
+      }
+    }
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public long memstoreReadPoint() {
+    return memstoreRead;
+  }
+
+  public static class WriteEntry {
+    private long writeNumber;
+    private volatile boolean completed = false;
+
+    WriteEntry(long writeNumber) {
+      this.writeNumber = writeNumber;
+    }
+    void markCompleted() {
+      this.completed = true;
+    }
+    boolean isCompleted() {
+      return this.completed;
+    }
+    long getWriteNumber() {
+      return this.writeNumber;
+    }
+    void setWriteNumber(long val){
+      this.writeNumber = val;
+    }
+  }
+
+  public static final long FIXED_SIZE = ClassSize.align(
+      ClassSize.OBJECT +
+      2 * Bytes.SIZEOF_LONG +
+      2 * ClassSize.REFERENCE);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
deleted file mode 100644
index 96af2c3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-
-/**
- * Manages the read/write consistency within memstore. This provides
- * an interface for readers to determine what entries to ignore, and
- * a mechanism for writers to obtain new write numbers, then "commit"
- * the new writes for readers to read (thus forming atomic transactions).
- */
-@InterfaceAudience.Private
-public class MultiVersionConsistencyControl {
-  private static final long NO_WRITE_NUMBER = 0;
-  private volatile long memstoreRead = 0;
-  private final Object readWaiters = new Object();
-
-  // This is the pending queue of writes.
-  private final LinkedList<WriteEntry> writeQueue =
-      new LinkedList<WriteEntry>();
-
-  /**
-   * Default constructor. Initializes the memstoreRead/Write points to 0.
-   */
-  public MultiVersionConsistencyControl() {
-  }
-
-  /**
-   * Initializes the memstoreRead/Write points appropriately.
-   * @param startPoint
-   */
-  public void initialize(long startPoint) {
-    synchronized (writeQueue) {
-      writeQueue.clear();
-      memstoreRead = startPoint;
-    }
-  }
-
-  /**
-   *
-   * @param initVal The value we used initially and expected it'll be reset later
-   * @return WriteEntry instance.
-   */
-  WriteEntry beginMemstoreInsert() {
-    return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
-  }
-
-  /**
-   * Get a mvcc write number before an actual one(its log sequence Id) being assigned
-   * @param sequenceId
-   * @return long a faked write number which is bigger enough not to be seen by others before
a real
-   *         one is assigned
-   */
-  public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
-    // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
-    // current MVCC completes. Theoretically the bump only needs to be 2 * the number of
handlers
-    // because each handler could increment sequence num twice and max concurrent in-flight
-    // transactions is the number of RPC handlers.
-    // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
-    // changes touch same row key
-    // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
-    // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
-    return sequenceId.incrementAndGet() + 1000000000;
-  }
-
-  /**
-   * This function starts a MVCC transaction with current region's log change sequence number.
Since
-   * we set change sequence number when flushing current change to WAL(late binding), the
flush
-   * order may differ from the order to start a MVCC transaction. For example, a change begins
a
-   * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore,
we
-   * add a safe bumper to the passed in sequence number to start a MVCC so that no other
concurrent
-   * transactions will reuse the number till current MVCC completes(success or fail). The
"faked"
-   * big number is safe because we only need it to prevent current change being seen and
the number
-   * will be reset to real sequence number(set in log sync) right before we complete a MVCC
in order
-   * for MVCC to align with flush sequence.
-   * @param curSeqNum
-   * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
-   */
-  public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
-    WriteEntry e = new WriteEntry(curSeqNum);
-    synchronized (writeQueue) {
-      writeQueue.add(e);
-      return e;
-    }
-  }
-
-  /**
-   * Complete a {@link WriteEntry} that was created by
-   * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
-   * point is at least as large as the write point of the passed in WriteEntry. Thus, the
write is
-   * visible to MVCC readers.
-   * @throws IOException
-   */
-  public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
-      throws IOException {
-    if(e == null) return;
-    if (seqId != null) {
-      e.setWriteNumber(seqId.getSequenceId());
-    } else {
-      // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
-      // function beginMemstoreInsertWithSeqNum in case of failures
-      e.setWriteNumber(NO_WRITE_NUMBER);
-    }
-    waitForPreviousTransactionsComplete(e);
-  }
-
-  /**
-   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At
the
-   * end of this call, the global read point is at least as large as the write point of the
passed
-   * in WriteEntry. Thus, the write is visible to MVCC readers.
-   */
-  public void completeMemstoreInsert(WriteEntry e) {
-    waitForPreviousTransactionsComplete(e);
-  }
-
-  /**
-   * Mark the {@link WriteEntry} as complete and advance the read point as
-   * much as possible.
-   *
-   * How much is the read point advanced?
-   * Let S be the set of all write numbers that are completed and where all previous write
numbers
-   * are also completed.  Then, the read point is advanced to the supremum of S.
-   *
-   * @param e
-   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
-   */
-  boolean advanceMemstore(WriteEntry e) {
-    long nextReadValue = -1;
-    synchronized (writeQueue) {
-      e.markCompleted();
-
-      while (!writeQueue.isEmpty()) {
-        WriteEntry queueFirst = writeQueue.getFirst();
-        if (queueFirst.isCompleted()) {
-          // Using Max because Edit complete in WAL sync order not arriving order
-          nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
-          writeQueue.removeFirst();
-        } else {
-          break;
-        }
-      }
-
-      if (nextReadValue > memstoreRead) {
-        memstoreRead = nextReadValue;
-      }
-
-      // notify waiters on writeQueue before return
-      writeQueue.notifyAll();
-    }
-
-    if (nextReadValue > 0) {
-      synchronized (readWaiters) {
-        readWaiters.notifyAll();
-      }
-    }
-
-    if (memstoreRead >= e.getWriteNumber()) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Advances the current read point to be given seqNum if it is smaller than
-   * that.
-   */
-  void advanceMemstoreReadPointIfNeeded(long seqNum) {
-    synchronized (writeQueue) {
-      if (this.memstoreRead < seqNum) {
-        memstoreRead = seqNum;
-      }
-    }
-  }
-
-  /**
-   * Wait for all previous MVCC transactions complete
-   */
-  public void waitForPreviousTransactionsComplete() {
-    WriteEntry w = beginMemstoreInsert();
-    waitForPreviousTransactionsComplete(w);
-  }
-
-  public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
-    boolean interrupted = false;
-    WriteEntry w = waitedEntry;
-
-    try {
-      WriteEntry firstEntry = null;
-      do {
-        synchronized (writeQueue) {
-          // writeQueue won't be empty at this point, the following is just a safety check
-          if (writeQueue.isEmpty()) {
-            break;
-          }
-          firstEntry = writeQueue.getFirst();
-          if (firstEntry == w) {
-            // all previous in-flight transactions are done
-            break;
-          }
-          try {
-            writeQueue.wait(0);
-          } catch (InterruptedException ie) {
-            // We were interrupted... finish the loop -- i.e. cleanup --and then
-            // on our way out, reset the interrupt flag.
-            interrupted = true;
-            break;
-          }
-        }
-      } while (firstEntry != null);
-    } finally {
-      if (w != null) {
-        advanceMemstore(w);
-      }
-    }
-    if (interrupted) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  public long memstoreReadPoint() {
-    return memstoreRead;
-  }
-
-  public static class WriteEntry {
-    private long writeNumber;
-    private volatile boolean completed = false;
-
-    WriteEntry(long writeNumber) {
-      this.writeNumber = writeNumber;
-    }
-    void markCompleted() {
-      this.completed = true;
-    }
-    boolean isCompleted() {
-      return this.completed;
-    }
-    long getWriteNumber() {
-      return this.writeNumber;
-    }
-    void setWriteNumber(long val){
-      this.writeNumber = val;
-    }
-  }
-
-  public static final long FIXED_SIZE = ClassSize.align(
-      ClassSize.OBJECT +
-      2 * Bytes.SIZEOF_LONG +
-      2 * ClassSize.REFERENCE);
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 9e7ff0f..5b33db4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -63,7 +63,7 @@ public interface RegionScanner extends InternalScanner, Shipper {
   long getMaxResultSize();
 
   /**
-   * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+   * @return The Scanner's MVCC readPt see {@link MultiVersionConcurrencyControl}
    */
   long getMvccReadPoint();
 
@@ -94,7 +94,7 @@ public interface RegionScanner extends InternalScanner, Shipper {
    * close a region operation, an synchronize on the scanner object. Example: <code>
    * HRegion region = ...;
    * RegionScanner scanner = ...
-   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+   * MultiVersionConcurrencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
    * region.startRegionOperation();
    * try {
    *   synchronized(scanner) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index d8d9522..be5df71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3538,7 +3538,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     Scan scan = new Scan(get);
     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
         scan.getFamilyMap().get(store.getFamily().getName()),
-        // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to
set
+        // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to
set
         // readpoint 0.
         0);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 4848d66..e50260f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -69,13 +69,13 @@ public class TestDefaultMemStore extends TestCase {
   private static final int ROW_COUNT = 10;
   private static final int QUALIFIER_COUNT = ROW_COUNT;
   private static final byte [] FAMILY = Bytes.toBytes("column");
-  private MultiVersionConsistencyControl mvcc;
+  private MultiVersionConcurrencyControl mvcc;
   private AtomicLong startSeqNum = new AtomicLong(0);
 
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    this.mvcc = new MultiVersionConsistencyControl();
+    this.mvcc = new MultiVersionConcurrencyControl();
     this.memstore = new DefaultMemStore();
   }
 
@@ -248,7 +248,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] q2 = Bytes.toBytes("q2");
     final byte[] v = Bytes.toBytes("value");
 
-    MultiVersionConsistencyControl.WriteEntry w =
+    MultiVersionConcurrencyControl.WriteEntry w =
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv1 = new KeyValue(row, f, q1, v);
@@ -292,7 +292,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] v2 = Bytes.toBytes("value2");
 
     // INSERT 1: Write both columns val1
-    MultiVersionConsistencyControl.WriteEntry w =
+    MultiVersionConcurrencyControl.WriteEntry w =
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
@@ -344,7 +344,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] q2 = Bytes.toBytes("q2");
     final byte[] v1 = Bytes.toBytes("value1");
     // INSERT 1: Write both columns val1
-    MultiVersionConsistencyControl.WriteEntry w =
+    MultiVersionConcurrencyControl.WriteEntry w =
         mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
@@ -388,7 +388,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] f = Bytes.toBytes("family");
     final byte[] q1 = Bytes.toBytes("q1");
 
-    final MultiVersionConsistencyControl mvcc;
+    final MultiVersionConcurrencyControl mvcc;
     final MemStore memstore;
     final AtomicLong startSeqNum;
 
@@ -397,7 +397,7 @@ public class TestDefaultMemStore extends TestCase {
 
     public ReadOwnWritesTester(int id,
                                MemStore memstore,
-                               MultiVersionConsistencyControl mvcc,
+                               MultiVersionConcurrencyControl mvcc,
                                AtomicReference<Throwable> caughtException,
                                AtomicLong startSeqNum)
     {
@@ -418,7 +418,7 @@ public class TestDefaultMemStore extends TestCase {
 
     private void internalRun() throws IOException {
       for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
-        MultiVersionConsistencyControl.WriteEntry w =
+        MultiVersionConcurrencyControl.WriteEntry w =
             mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
         // Insert the sequence value (i)

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
new file mode 100644
index 0000000..7b6e7b3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConcurrencyControl.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is a hammer test that verifies MultiVersionConcurrencyControl in a
+ * multiple writer single reader scenario.
+ */
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMultiVersionConcurrencyControl extends TestCase {
+  static class Writer implements Runnable {
+    final AtomicBoolean finished;
+    final MultiVersionConcurrencyControl mvcc;
+    final AtomicBoolean status;
+
+    Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status)
{
+      this.finished = finished;
+      this.mvcc = mvcc;
+      this.status = status;
+    }
+
+    private Random rnd = new Random();
+    public boolean failed = false;
+
+    public void run() {
+      AtomicLong startPoint = new AtomicLong();
+      while (!finished.get()) {
+        MultiVersionConcurrencyControl.WriteEntry e =
+            mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
+        // System.out.println("Begin write: " + e.getWriteNumber());
+        // 10 usec - 500usec (including 0)
+        int sleepTime = rnd.nextInt(500);
+        // 500 * 1000 = 500,000ns = 500 usec
+        // 1 * 100 = 100ns = 1usec
+        try {
+          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
+        } catch (InterruptedException e1) {
+        }
+        try {
+          mvcc.completeMemstoreInsert(e);
+        } catch (RuntimeException ex) {
+          // got failure
+          System.out.println(ex.toString());
+          ex.printStackTrace();
+          status.set(false);
+          return;
+          // Report failure if possible.
+        }
+      }
+    }
+  }
+
+  public void testParallelism() throws Exception {
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+    final AtomicBoolean finished = new AtomicBoolean(false);
+
+    // fail flag for the reader thread
+    final AtomicBoolean readerFailed = new AtomicBoolean(false);
+    final AtomicLong failedAt = new AtomicLong();
+    Runnable reader = new Runnable() {
+      public void run() {
+        long prev = mvcc.memstoreReadPoint();
+        while (!finished.get()) {
+          long newPrev = mvcc.memstoreReadPoint();
+          if (newPrev < prev) {
+            // serious problem.
+            System.out.println("Reader got out of order, prev: " + prev + " next was: " +
newPrev);
+            readerFailed.set(true);
+            // might as well give up
+            failedAt.set(newPrev);
+            return;
+          }
+        }
+      }
+    };
+
+    // writer thread parallelism.
+    int n = 20;
+    Thread[] writers = new Thread[n];
+    AtomicBoolean[] statuses = new AtomicBoolean[n];
+    Thread readThread = new Thread(reader);
+
+    for (int i = 0; i < n; ++i) {
+      statuses[i] = new AtomicBoolean(true);
+      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
+      writers[i].start();
+    }
+    readThread.start();
+
+    try {
+      Thread.sleep(10 * 1000);
+    } catch (InterruptedException ex) {
+    }
+
+    finished.set(true);
+
+    readThread.join();
+    for (int i = 0; i < n; ++i) {
+      writers[i].join();
+    }
+
+    // check failure.
+    assertFalse(readerFailed.get());
+    for (int i = 0; i < n; ++i) {
+      assertTrue(statuses[i].get());
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
deleted file mode 100644
index 09b2226..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiVersionConsistencyControl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.experimental.categories.Category;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This is a hammer test that verifies MultiVersionConsistencyControl in a
- * multiple writer single reader scenario.
- */
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestMultiVersionConsistencyControl extends TestCase {
-  static class Writer implements Runnable {
-    final AtomicBoolean finished;
-    final MultiVersionConsistencyControl mvcc;
-    final AtomicBoolean status;
-
-    Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status)
{
-      this.finished = finished;
-      this.mvcc = mvcc;
-      this.status = status;
-    }
-
-    private Random rnd = new Random();
-    public boolean failed = false;
-
-    public void run() {
-      AtomicLong startPoint = new AtomicLong();
-      while (!finished.get()) {
-        MultiVersionConsistencyControl.WriteEntry e = 
-            mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
-        // System.out.println("Begin write: " + e.getWriteNumber());
-        // 10 usec - 500usec (including 0)
-        int sleepTime = rnd.nextInt(500);
-        // 500 * 1000 = 500,000ns = 500 usec
-        // 1 * 100 = 100ns = 1usec
-        try {
-          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
-        } catch (InterruptedException e1) {
-        }
-        try {
-          mvcc.completeMemstoreInsert(e);
-        } catch (RuntimeException ex) {
-          // got failure
-          System.out.println(ex.toString());
-          ex.printStackTrace();
-          status.set(false);
-          return;
-          // Report failure if possible.
-        }
-      }
-    }
-  }
-
-  public void testParallelism() throws Exception {
-    final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
-
-    final AtomicBoolean finished = new AtomicBoolean(false);
-
-    // fail flag for the reader thread
-    final AtomicBoolean readerFailed = new AtomicBoolean(false);
-    final AtomicLong failedAt = new AtomicLong();
-    Runnable reader = new Runnable() {
-      public void run() {
-        long prev = mvcc.memstoreReadPoint();
-        while (!finished.get()) {
-          long newPrev = mvcc.memstoreReadPoint();
-          if (newPrev < prev) {
-            // serious problem.
-            System.out.println("Reader got out of order, prev: " + prev + " next was: " +
newPrev);
-            readerFailed.set(true);
-            // might as well give up
-            failedAt.set(newPrev);
-            return;
-          }
-        }
-      }
-    };
-
-    // writer thread parallelism.
-    int n = 20;
-    Thread[] writers = new Thread[n];
-    AtomicBoolean[] statuses = new AtomicBoolean[n];
-    Thread readThread = new Thread(reader);
-
-    for (int i = 0; i < n; ++i) {
-      statuses[i] = new AtomicBoolean(true);
-      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
-      writers[i].start();
-    }
-    readThread.start();
-
-    try {
-      Thread.sleep(10 * 1000);
-    } catch (InterruptedException ex) {
-    }
-
-    finished.set(true);
-
-    readThread.join();
-    for (int i = 0; i < n; ++i) {
-      writers[i].join();
-    }
-
-    // check failure.
-    assertFalse(readerFailed.get());
-    for (int i = 0; i < n; ++i) {
-      assertTrue(statuses[i].get());
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9334a47d/src/main/asciidoc/_chapters/architecture.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc
index 740b585..e6a71f1 100644
--- a/src/main/asciidoc/_chapters/architecture.adoc
+++ b/src/main/asciidoc/_chapters/architecture.adoc
@@ -1495,7 +1495,7 @@ The minimum flush unit is per region, not at individual MemStore level.
 * The `RegionScanner` object contains a list of `StoreScanner` objects, one per column family.
 * Each `StoreScanner` object further contains a list of `StoreFileScanner` objects, corresponding
to each StoreFile and HFile of the corresponding column family, and a list of `KeyValueScanner`
objects for the MemStore.
 * The two lists are merged into one, which is sorted in ascending order with the scan object
for the MemStore at the end of the list.
-* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConsistencyControl`
read point, which is the current `memstoreTS`, filtering out any new updates beyond the read
point.
+* When a `StoreFileScanner` object is constructed, it is associated with a `MultiVersionConcurrencyControl`
read point, which is the current `memstoreTS`, filtering out any new updates beyond the read
point.
 
 [[hfile]]
 ==== StoreFile (HFile)


Mime
View raw message