hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject hbase git commit: HBASE-17471 Region Seqid will be out of order in WAL if using mvccPreAssign
Date Sat, 06 May 2017 15:31:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 408645c4e -> 9a97e28bd


HBASE-17471 Region Seqid will be out of order in WAL if using mvccPreAssign

Signed-off-by: Yu Li <liyu@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a97e28b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a97e28b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a97e28b

Branch: refs/heads/branch-1
Commit: 9a97e28bdf202ea2a8cadffbb8f8de4c1411447e
Parents: 408645c
Author: Allan Yang <allanwin@163.com>
Authored: Sat May 6 23:28:14 2017 +0800
Committer: Yu Li <liyu@apache.org>
Committed: Sat May 6 23:29:49 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 66 +++++------------
 .../MultiVersionConcurrencyControl.java         | 32 +++++---
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 45 +++++------
 .../hbase/regionserver/wal/FSWALEntry.java      | 27 ++-----
 .../org/apache/hadoop/hbase/wal/WALKey.java     | 78 +-------------------
 .../hbase/coprocessor/TestWALObserver.java      |  6 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |  7 +-
 .../hbase/regionserver/TestWALLockup.java       | 12 +--
 .../hbase/regionserver/wal/TestFSHLog.java      |  1 +
 .../wal/TestWALActionsListener.java             |  5 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |  2 +-
 .../apache/hadoop/hbase/wal/TestSecureWAL.java  |  4 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java |  4 +-
 13 files changed, 92 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1fbb9b2..8e1306f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -664,10 +664,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   private final Durability durability;
   private final boolean regionStatsEnabled;
 
-  // flag and lock for MVCC preassign
-  private final boolean mvccPreAssign;
-  private final ReentrantLock preAssignMvccLock;
-
   // whether to unassign region if we hit FNFE
   private final RegionUnassigner regionUnassigner;
   /**
@@ -820,13 +816,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
               HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
 
-    // get mvcc pre-assign flag and lock
-    this.mvccPreAssign = conf.getBoolean(HREGION_MVCC_PRE_ASSIGN, DEFAULT_HREGION_MVCC_PRE_ASSIGN);
-    if (this.mvccPreAssign) {
-      this.preAssignMvccLock = new ReentrantLock();
-    } else {
-      this.preAssignMvccLock = null;
-    }
     boolean unassignForFNFE =
         conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE);
     if (unassignForFNFE) {
@@ -2674,9 +2663,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     // a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
     // Also, the getSequenceId blocks on a latch. There is no global list of outstanding
latches
     // so if an abort or stop, there is no way to call them in.
-    WALKey key = this.appendEmptyEdit(wal, null);
+    WALKey key = this.appendEmptyEdit(wal);
     mvcc.complete(key.getWriteEntry());
-    return key.getSequenceId(this.maxWaitForSeqId);
+    return key.getSequenceId();
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -3418,29 +3407,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
               this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
true);
         }
       } else {
-        try {
-          if (mvccPreAssign) {
-            preAssignMvccLock.lock();
-            writeEntry = mvcc.begin();
-          }
-          if (walEdit.size() > 0) {
-            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-                this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-                mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
-            if (mvccPreAssign) {
-              walKey.setPreAssignedWriteEntry(writeEntry);
-            }
-            txid =
-                this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
true);
-          } else {
-            // If this is a skip wal operation just get the read point from mvcc
-            walKey = this.appendEmptyEdit(this.wal, writeEntry);
-          }
-        } finally {
-          if (mvccPreAssign) {
-            preAssignMvccLock.unlock();
-          }
+        if (walEdit.size() > 0) {
+          // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+          walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+              mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+          txid = this.wal
+              .append(this.htableDescriptor, this.getRegionInfo(), walKey,
+                  walEdit, true);
+        } else {
+          walKey = appendEmptyEdit(wal);
         }
       }
       // ------------------------------------
@@ -3478,7 +3454,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         // before apply to memstore to avoid scan return incorrect value.
         // we use durability of the original mutation for the mutation passed by CP.
         boolean updateSeqId = isInReplay
-            || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL || mvccPreAssign;
+            || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
         if (updateSeqId) {
           updateSequenceId(familyMaps[i].values(), mvccNum);
         }
@@ -7402,7 +7378,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           if(walKey == null){
             // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked"
WALEdit
             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
-            walKey = this.appendEmptyEdit(this.wal, null);
+            walKey = this.appendEmptyEdit(this.wal);
           }
 
           // 7. Start mvcc transaction
@@ -7701,7 +7677,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
           boolean updateSeqId = false;
           if (walKey == null) {
             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
-            walKey = this.appendEmptyEdit(this.wal, null);
+            walKey = this.appendEmptyEdit(this.wal);
             // If no WAL, FSWALEntry won't be used and no update for sequence id
             updateSeqId = true;
           }
@@ -7934,7 +7910,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
               this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits,
true);
           } else {
             // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
-            walKey = this.appendEmptyEdit(this.wal, null);
+            walKey = this.appendEmptyEdit(this.wal);
             // If no WAL, FSWALEntry won't be used and no update for sequence id
             updateSeqId = true;
           }
@@ -8160,9 +8136,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      47 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
+      46 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
-      6 * Bytes.SIZEOF_BOOLEAN);
+      5 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:
   // 1 x HashMap - coprocessorServiceHandlers
@@ -8744,19 +8720,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
    * Append a faked WALEdit in order to get a long sequence number and wal syncer will just
ignore
    * the WALEdit append later.
    * @param wal
-   * @param writeEntry Preassigned writeEntry, if any
    * @return Return the key used appending with no sync and no append.
    * @throws IOException
    */
-  private WALKey appendEmptyEdit(final WAL wal, WriteEntry writeEntry) throws IOException
{
+  private WALKey appendEmptyEdit(final WAL wal) throws IOException {
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     @SuppressWarnings("deprecation")
     WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
       getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
       HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
-    if (writeEntry != null) {
-      key.setPreAssignedWriteEntry(writeEntry);
-    }
 
     // Call append but with an empty WALEdit.  The returned sequence id will not be associated
     // with any edit and we can be sure it went in after all outstanding appends.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d65af00..57d6356 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -111,20 +111,34 @@ public class MultiVersionConcurrencyControl {
   }
 
   /**
+   * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
+   */
+  public WriteEntry begin() {
+    return begin(new Runnable() {
+      @Override public void run() {
+
+      }
+    });
+  }
+
+  /**
    * Start a write transaction. Create a new {@link WriteEntry} with a new write number and
add it
-   * to our queue of ongoing writes. Return this WriteEntry instance.
-   * To complete the write transaction and wait for it to be visible, call
-   * {@link #completeAndWait(WriteEntry)}. If the write failed, call
-   * {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of the failed
write
-   * transaction.
+   * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
+   * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}.
If the
+   * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL
trace of
+   * the failed write transaction.
+   * <p>
+   * The {@code action} will be executed under the lock which means it can keep the same
order with
+   * mvcc.
    * @see #complete(WriteEntry)
    * @see #completeAndWait(WriteEntry)
    */
-  public WriteEntry begin() {
+  public WriteEntry begin(Runnable action) {
     synchronized (writeQueue) {
       long nextWriteNumber = writePoint.incrementAndGet();
       WriteEntry e = new WriteEntry(nextWriteNumber);
       writeQueue.add(e);
+      action.run();
       return e;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index d3f302a..8d97b64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.lmax.disruptor.*;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -89,11 +92,6 @@ import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
@@ -1112,21 +1110,22 @@ public class FSHLog implements WAL {
     // Make a trace scope for the append.  It is closed on other side of the ring buffer
by the
     // single consuming thread.  Don't have to worry about it.
     TraceScope scope = Trace.startSpan("FSHLog.append");
-
-    // This is crazy how much it takes to make an edit.  Do we need all this stuff!!!!????
 We need
-    // all this to make a key and then below to append the edit, we need to carry htd, info,
-    // etc. all over the ring buffer.
-    FSWALEntry entry = null;
-    long sequence = this.disruptor.getRingBuffer().next();
+    final MutableLong txidHolder = new MutableLong();
+    final RingBuffer<RingBufferTruck> ringBuffer = disruptor.getRingBuffer();
+    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(new Runnable() {
+      @Override public void run() {
+        txidHolder.setValue(ringBuffer.next());
+      }
+    });
+    long txid = txidHolder.longValue();
     try {
-      RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
-      // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
-      entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
-      truck.loadPayload(entry, scope.detach());
+      FSWALEntry entry = new FSWALEntry(txid, key, edits, htd, hri, inMemstore);
+      entry.stampRegionSequenceId(we);
+      ringBuffer.get(txid).loadPayload(entry, scope.detach());
     } finally {
-      this.disruptor.getRingBuffer().publish(sequence);
+      ringBuffer.publish(txid);
     }
-    return sequence;
+    return txid;
   }
 
   /**
@@ -1814,12 +1813,6 @@ public class FSHLog implements WAL {
           try {
             FSWALEntry entry = truck.unloadFSWALEntryPayload();
             if (this.exception != null) {
-              // We got an exception on an earlier attempt at append. Do not let this append
-              // go through. Fail it but stamp the sequenceid into this append though failed.
-              // We need to do this to close the latch held down deep in WALKey...that is
waiting
-              // on sequenceid assignment otherwise it will just hang out (The #append method
-              // called below does this also internally).
-              entry.stampRegionSequenceId();
               // Return to keep processing events coming off the ringbuffer
               return;
             }
@@ -1940,10 +1933,8 @@ public class FSHLog implements WAL {
       byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
       long regionSequenceId = WALKey.NO_SEQUENCE_ID;
       try {
-        // We are about to append this edit; update the region-scoped sequence number.  Do
it
-        // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
-        // so region sequenceids will also be in order.
-        regionSequenceId = entry.stampRegionSequenceId();
+
+        regionSequenceId = entry.getKey().getSequenceId();
         // Edits are empty, there is nothing to append.  Maybe empty when we are looking
for a
         // region sequence id only, a region edit/sequence id that is not associated with
an actual
         // edit. It has to go through all the rigmarole to be sure we have the right ordering.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index f55e185..69d1c59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -106,34 +106,19 @@ class FSWALEntry extends Entry {
 
   /**
    * Here is where a WAL edit gets its sequenceid.
+   * @param we after HBASE-17471 we already get the mvcc number
+   * in WriteEntry, just stamp the writenumber to cells and walkey
    * @return The sequenceid we stamped on this edit.
    * @throws IOException
    */
-  long stampRegionSequenceId() throws IOException {
-    long regionSequenceId = WALKey.NO_SEQUENCE_ID;
-    WALKey key = getKey();
-    MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
-    boolean preAssigned = (we != null);
-    if (!preAssigned) {
-      MultiVersionConcurrencyControl mvcc = key.getMvcc();
-      if (mvcc != null) {
-        we = mvcc.begin();
-      }
-    }
-    if (we != null) {
-      regionSequenceId = we.getWriteNumber();
-    }
-
+  long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException
{
+    long regionSequenceId = we.getWriteNumber();
     if (!this.getEdit().isReplay() && inMemstore) {
-      for (Cell c:getEdit().getCells()) {
+      for (Cell c : getEdit().getCells()) {
         CellUtil.setSequenceId(c, regionSequenceId);
       }
     }
-
-    // This has to stay in this order
-    if (!preAssigned) {
-      key.setWriteEntry(we);
-    }
+    getKey().setWriteEntry(we);
     return regionSequenceId;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 585c8f4..01420d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -95,37 +95,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    */
   @InterfaceAudience.Private // For internal use only.
   public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException
{
-    if (this.preAssignedWriteEntry != null) {
-      // don't wait for seqNumAssignedLatch if writeEntry is preassigned
-      return this.preAssignedWriteEntry;
-    }
-    try {
-      this.seqNumAssignedLatch.await();
-    } catch (InterruptedException ie) {
-      // If interrupted... clear out our entry else we can block up mvcc.
-      MultiVersionConcurrencyControl mvcc = getMvcc();
-      LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
-      if (mvcc != null) {
-        if (this.writeEntry != null) {
-          mvcc.complete(this.writeEntry);
-        }
-      }
-      InterruptedIOException iie = new InterruptedIOException();
-      iie.initCause(ie);
-      throw iie;
-    }
+    assert this.writeEntry != null;
     return this.writeEntry;
   }
 
   @InterfaceAudience.Private // For internal use only.
   public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
-    assert this.writeEntry == null : "Non-null writeEntry when trying to set one";
+    assert this.writeEntry == null;
     this.writeEntry = writeEntry;
     // Set our sequenceid now using WriteEntry.
-    if (this.writeEntry != null) {
-      this.logSeqNum = this.writeEntry.getWriteNumber();
-    }
-    this.seqNumAssignedLatch.countDown();
+    this.logSeqNum = writeEntry.getWriteNumber();
   }
 
   // should be < 0 (@see HLogKey#readFields(DataInput))
@@ -189,7 +168,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   @InterfaceAudience.Private
   protected long logSeqNum;
   private long origLogSeqNum = 0;
-  private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
   // Time at which this edit was written.
   // visible for deprecated HLogKey
   @InterfaceAudience.Private
@@ -206,7 +184,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   private long nonce = HConstants.NO_NONCE;
   private MultiVersionConcurrencyControl mvcc;
   private MultiVersionConcurrencyControl.WriteEntry writeEntry;
-  private MultiVersionConcurrencyControl.WriteEntry preAssignedWriteEntry = null;
   public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
 
   // visible for deprecated HLogKey
@@ -393,36 +370,6 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    */
   @Override
   public long getSequenceId() throws IOException {
-    return getSequenceId(-1);
-  }
-
-  /**
-   * Wait for sequence number to be assigned &amp; return the assigned value.
-   * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid
-   * @return long the new assigned sequence number
-   * @throws IOException
-   */
-  public long getSequenceId(final long maxWaitForSeqId) throws IOException {
-    // TODO: This implementation waiting on a latch is problematic because if a higher level
-    // determines we should stop or abort, there is no global list of all these blocked WALKeys
-    // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId.
-    //
-    // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid,
-    // even those that have failed (previously we were not... so they would just hang out...).
-    // St.Ack 20150910
-    try {
-      if (maxWaitForSeqId < 0) {
-        this.seqNumAssignedLatch.await();
-      } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS))
{
-        throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId
+
-          "ms; WAL system stuck or has gone away?");
-      }
-    } catch (InterruptedException ie) {
-      LOG.warn("Thread interrupted waiting for next log sequence number");
-      InterruptedIOException iie = new InterruptedIOException();
-      iie.initCause(ie);
-      throw iie;
-    }
     return this.logSeqNum;
   }
 
@@ -667,23 +614,4 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     }
   }
 
-  /**
-   * @return The preassigned writeEntry, if any
-   */
-  @InterfaceAudience.Private // For internal use only.
-  public MultiVersionConcurrencyControl.WriteEntry getPreAssignedWriteEntry() {
-    return this.preAssignedWriteEntry;
-  }
-
-  /**
-   * Preassign writeEntry
-   * @param writeEntry the entry to assign
-   */
-  @InterfaceAudience.Private // For internal use only.
-  public void setPreAssignedWriteEntry(WriteEntry writeEntry) {
-    if (writeEntry != null) {
-      this.preAssignedWriteEntry = writeEntry;
-      this.logSeqNum = writeEntry.getWriteNumber();
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index aca2978..2acafb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -242,7 +242,9 @@ public class TestWALObserver {
     // it's where WAL write cp should occur.
     long now = EnvironmentEdgeManager.currentTime();
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(),
now),
+    long txid = log.append(htd, hri,
+        new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
+            new MultiVersionConcurrencyControl()),
         edit, true);
     log.sync(txid);
 
@@ -326,7 +328,7 @@ public class TestWALObserver {
 
     LOG.debug("write a log edit that supports legacy cps.");
     final long now = EnvironmentEdgeManager.currentTime();
-    final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
+    final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
mvcc);
     final WALEdit edit = new WALEdit();
     final byte[] nonce = Bytes.toBytes("1772");
     edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 0b7e3b5..3c9abfa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -6272,11 +6272,8 @@ public class TestHRegion {
         @Override
         public Long answer(InvocationOnMock invocation) throws Throwable {
           WALKey key = invocation.getArgumentAt(2, WALKey.class);
-          MultiVersionConcurrencyControl.WriteEntry we = key.getPreAssignedWriteEntry();
-          if (we == null) {
-            we = key.getMvcc().begin();
-            key.setWriteEntry(we);
-          }
+          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
+          key.setWriteEntry(we);
           return 1L;
         }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index d82d1df..0a0393a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -216,13 +216,15 @@ public class TestWALLockup {
     HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
     final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
     byte [] bytes = Bytes.toBytes(getName());
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     try {
       // First get something into memstore. Make a Put and then pull the Cell out of it.
Will
       // manage append and sync carefully in below to manufacture hang. We keep adding same
       // edit. WAL subsystem doesn't care.
       Put put = new Put(bytes);
       put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
-      WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
+      WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
+          htd.getTableName(), System.currentTimeMillis(), mvcc);
       WALEdit edit = new WALEdit();
       CellScanner CellScanner = put.cellScanner();
       assertTrue(CellScanner.advance());
@@ -388,12 +390,12 @@ public class TestWALLockup {
     HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
     final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
     byte[] bytes = Bytes.toBytes(getName());
-
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     try {
       Put put = new Put(bytes);
       put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
       WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
-          htd.getTableName());
+          htd.getTableName(), System.currentTimeMillis(), mvcc);
       WALEdit edit = new WALEdit();
       CellScanner CellScanner = put.cellScanner();
       assertTrue(CellScanner.advance());
@@ -425,7 +427,7 @@ public class TestWALLockup {
       // make RingBufferEventHandler sleep 1s, so the following sync
       // endOfBatch=false
       key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
-          TableName.valueOf("sleep"));
+          TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc);
       dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
 
       Thread t = new Thread("Sync") {
@@ -449,7 +451,7 @@ public class TestWALLockup {
       }
       // make append throw DamagedWALException
       key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
-          TableName.valueOf("DamagedWALException"));
+          TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc);
       dodgyWAL2.append(htd, region.getRegionInfo(), key, edit, true);
 
       while (latch.getCount() > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 1fcb241..3a23a05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -416,6 +416,7 @@ public class TestFSHLog {
         final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
             System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
         wal.append(htd, info, logkey, edits, true);
+        region.getMVCC().completeAndWait(logkey.getWriteEntry());
       }
       region.flush(true);
       // FlushResult.flushSequenceId is not visible here so go get the current sequence id.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index bac1b6f..b1eccdf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -101,7 +102,7 @@ public class TestWALActionsListener {
     HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
              SOME_BYTES, SOME_BYTES, false);
     final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
-
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     for (int i = 0; i < 20; i++) {
       byte[] b = Bytes.toBytes(i+"");
       KeyValue kv = new KeyValue(b,b,b);
@@ -111,7 +112,7 @@ public class TestWALActionsListener {
       htd.addFamily(new HColumnDescriptor(b));
 
       final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
-          TableName.valueOf(b), 0), edit, true);
+          TableName.valueOf(b), 0, mvcc), edit, true);
       wal.sync(txid);
       if (i == 10) {
         wal.registerWALActionsListener(laterobserver);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 496ee53..3a02378 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -1170,7 +1170,7 @@ public class TestWALReplay {
     FSWALEntry entry =
         new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
           rowName, family, ee, index), htd, hri, true);
-    entry.stampRegionSequenceId();
+    entry.stampRegionSequenceId(mvcc.begin());
     return entry;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index d9454bb..b405698 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
@@ -95,11 +96,12 @@ public class TestSecureWAL {
     final WAL wal =
         wals.getWAL(regioninfo.getEncodedNameAsBytes(), regioninfo.getTable().getNamespace());
 
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     for (int i = 0; i < total; i++) {
       WALEdit kvs = new WALEdit();
       kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
       wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), kvs, true);
+          System.currentTimeMillis(), mvcc), kvs, true);
     }
     wal.sync();
     final Path walPath = DefaultWALProvider.getCurrentFileName(wal);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a97e28b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 9b4a968..621d092 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -370,12 +370,12 @@ public class TestWALFactory {
 
     HTableDescriptor htd = new HTableDescriptor();
     htd.addFamily(new HColumnDescriptor(tableName.getName()));
-
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     for (int i = 0; i < total; i++) {
       WALEdit kvs = new WALEdit();
       kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
       wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), kvs,  true);
+          System.currentTimeMillis(), mvcc), kvs,  true);
     }
     // Now call sync to send the data to HDFS datanodes
     wal.sync();


Mime
View raw message