hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-15699 Can not sync AsyncFSWAL if no edit is appended
Date Mon, 25 Apr 2016 05:18:39 GMT
Repository: hbase
Updated Branches:
  refs/heads/master e0b809886 -> 3cfe363f1


HBASE-15699 Can not sync AsyncFSWAL if no edit is appended


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3cfe363f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3cfe363f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3cfe363f

Branch: refs/heads/master
Commit: 3cfe363f130d8d71d5588e234121bb719f3b52d9
Parents: e0b8098
Author: zhangduo <zhangduo@apache.org>
Authored: Mon Apr 25 13:17:26 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Mon Apr 25 13:17:26 2016 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   |   6 +-
 .../hbase/regionserver/wal/SyncFuture.java      |  40 +-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |   8 +-
 .../regionserver/wal/AbstractTestFSWAL.java     | 435 +++++++++++++++++++
 .../hbase/regionserver/wal/TestAsyncFSWAL.java  |  79 ++++
 .../hbase/regionserver/wal/TestFSHLog.java      | 409 +----------------
 6 files changed, 561 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 3736ae3..9ef161a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -775,10 +775,12 @@ public abstract class AbstractFSWAL<W> implements WAL {
   protected SyncFuture getSyncFuture(final long sequence, Span span) {
     SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
     if (syncFuture == null) {
-      syncFuture = new SyncFuture();
+      syncFuture = new SyncFuture(sequence, span);
       this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
+    } else {
+      syncFuture.reset(sequence, span);
     }
-    return syncFuture.reset(sequence, span);
+    return syncFuture;
   }
 
   protected void requestLogRoll(boolean tooFewReplicas) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
index b5c9a2e..5ec218a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
@@ -46,7 +46,7 @@ class SyncFuture {
   // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
   // to coordinate on but it did not give any obvious advantage and some issues with order in which
   // events happen.
-  private static final long NOT_DONE = 0;
+  private static final long NOT_DONE = -1L;
 
   /**
    * The transaction id of this operation, monotonically increases.
@@ -55,16 +55,14 @@ class SyncFuture {
 
   /**
    * The transaction id that was set in here when we were marked done. Should be equal or > txnId.
-   * Put this data member into the NOT_DONE state while this class is in use. But for the first
-   * position on construction, let it be -1 so we can immediately call {@link #reset(long, Span)}
-   * below and it will work.
+   * Put this data member into the NOT_DONE state while this class is in use.
    */
-  private long doneTxid = -1;
+  private long doneTxid;
 
   /**
    * If error, the associated throwable. Set when the future is 'done'.
    */
-  private Throwable throwable = null;
+  private Throwable throwable;
 
   private Thread t;
 
@@ -73,34 +71,30 @@ class SyncFuture {
    */
   private Span span;
 
-  /**
-   * Call this method to clear old usage and get it ready for new deploy. Call this method even if
-   * it is being used for the first time.
-   * @param txnId the new transaction id
-   * @return this
-   */
-  synchronized SyncFuture reset(final long txnId) {
-    return reset(txnId, null);
+  SyncFuture(long txid, Span span) {
+    this.t = Thread.currentThread();
+    this.txid = txid;
+    this.span = span;
+    this.doneTxid = NOT_DONE;
   }
 
   /**
-   * Call this method to clear old usage and get it ready for new deploy. Call this method even if
-   * it is being used for the first time.
-   * @param sequence sequenceId from this Future's position in the RingBuffer
-   * @param span curren span, detached from caller. Don't forget to attach it when resuming after a
+   * Call this method to clear old usage and get it ready for new deploy.
+   * @param txid the new transaction id
+   * @param span current span, detached from caller. Don't forget to attach it when resuming after a
    *          call to {@link #get()}.
    * @return this
    */
-  synchronized SyncFuture reset(final long txnId, Span span) {
+  synchronized SyncFuture reset(final long txid, Span span) {
     if (t != null && t != Thread.currentThread()) {
       throw new IllegalStateException();
     }
     t = Thread.currentThread();
     if (!isDone()) {
-      throw new IllegalStateException("" + txnId + " " + Thread.currentThread());
+      throw new IllegalStateException("" + txid + " " + Thread.currentThread());
     }
     this.doneTxid = NOT_DONE;
-    this.txid = txnId;
+    this.txid = txid;
     this.span = span;
     return this;
   }
@@ -156,11 +150,11 @@ class SyncFuture {
     return true;
   }
 
-  public boolean cancel(boolean mayInterruptIfRunning) {
+  boolean cancel(boolean mayInterruptIfRunning) {
     throw new UnsupportedOperationException();
   }
 
-  public synchronized long get() throws InterruptedException, ExecutionException {
+  synchronized long get() throws InterruptedException, ExecutionException {
     while (!isDone()) {
       wait(1000);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 38002fb..b8fffb3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import com.google.common.base.Throwables;
+
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -71,15 +73,15 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
   public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
       boolean overwritable, EventLoop eventLoop) throws IOException {
     // Configuration already does caching for the Class lookup.
-    Class<? extends AsyncWriter> logWriterClass =
-        conf.getClass("hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class,
-          AsyncWriter.class);
+    Class<? extends AsyncWriter> logWriterClass = conf.getClass(
+      "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
       AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
       writer.init(fs, path, conf, overwritable);
       return writer;
     } catch (Exception e) {
       LOG.debug("Error instantiating log writer.", e);
+      Throwables.propagateIfPossible(e, IOException.class);
       throw new IOException("cannot get log writer", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
new file mode 100644
index 0000000..214565e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public abstract class AbstractTestFSWAL {
+
+  private static final Log LOG = LogFactory.getLog(AbstractTestFSWAL.class);
+
+  protected static Configuration CONF;
+  protected static FileSystem FS;
+  protected static Path DIR;
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = FS.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      FS.delete(dir.getPath(), true);
+    }
+    final Path hbaseDir = TEST_UTIL.createRootDir();
+    DIR = new Path(hbaseDir, currentTest.getMethodName());
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+      SampleRegionWALObserver.class.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    CONF = TEST_UTIL.getConfiguration();
+    FS = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  protected abstract AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix) throws IOException;
+
+  protected abstract AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix, Runnable action) throws IOException;
+
+  /**
+   * A loaded WAL coprocessor won't break existing WAL test cases.
+   */
+  @Test
+  public void testWALCoprocessorLoaded() throws Exception {
+    // test to see whether the coprocessor is loaded or not.
+    AbstractFSWAL<?> log = null;
+    try {
+      log = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
+        CONF, null, true, null, null);
+      WALCoprocessorHost host = log.getCoprocessorHost();
+      Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+      assertNotNull(c);
+    } finally {
+      if (log != null) {
+        log.close();
+      }
+    }
+  }
+
+  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
+      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
+      throws IOException {
+    final byte[] row = Bytes.toBytes("row");
+    for (int i = 0; i < times; i++) {
+      long timestamp = System.currentTimeMillis();
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, row, row, timestamp, row));
+      WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
+          WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
+          HConstants.NO_NONCE, mvcc, scopes);
+      log.append(hri, key, cols, true);
+    }
+    log.sync();
+  }
+
+  /**
+   * helper method to simulate region flush for a WAL.
+   * @param wal
+   * @param regionEncodedName
+   */
+  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
+    wal.completeCacheFlush(regionEncodedName);
+  }
+
+  /**
+   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
+   * exception if we do). Comparison is based on the timestamp present in the wal name.
+   * @throws Exception
+   */
+  @Test
+  public void testWALComparator() throws Exception {
+    AbstractFSWAL<?> wal1 = null;
+    AbstractFSWAL<?> walMeta = null;
+    try {
+      wal1 = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
+        CONF, null, true, null, null);
+      LOG.debug("Log obtained is: " + wal1);
+      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
+      Path p1 = wal1.computeFilename(11);
+      Path p2 = wal1.computeFilename(12);
+      // comparing with itself returns 0
+      assertTrue(comp.compare(p1, p1) == 0);
+      // comparing with different filenum.
+      assertTrue(comp.compare(p1, p2) < 0);
+      walMeta =
+          newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
+            CONF, null, true, null, DefaultWALProvider.META_WAL_PROVIDER_ID);
+      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
+
+      Path p1WithMeta = walMeta.computeFilename(11);
+      Path p2WithMeta = walMeta.computeFilename(12);
+      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
+      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
+      // mixing meta and non-meta logs gives error
+      boolean ex = false;
+      try {
+        comp.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        ex = true;
+      }
+      assertTrue("Comparator doesn't complain while checking meta log files", ex);
+      boolean exMeta = false;
+      try {
+        compMeta.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        exMeta = true;
+      }
+      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
+    } finally {
+      if (wal1 != null) {
+        wal1.close();
+      }
+      if (walMeta != null) {
+        walMeta.close();
+      }
+    }
+  }
+
+  /**
+   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
+   * regions which should be flushed in order to archive the oldest wal file.
+   * <p>
+   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
+   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
+   * rolling the wal.
+   * @throws Exception
+   */
+  @Test
+  public void testFindMemStoresEligibleForFlush() throws Exception {
+    LOG.debug("testFindMemStoresEligibleForFlush");
+    Configuration conf1 = HBaseConfiguration.create(CONF);
+    conf1.setInt("hbase.regionserver.maxlogs", 1);
+    AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(conf1), DIR.toString(),
+      HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+    HTableDescriptor t1 =
+        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+    HTableDescriptor t2 =
+        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
+    HRegionInfo hri1 =
+        new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HRegionInfo hri2 =
+        new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    // add edits and roll the wal
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : t1.getFamiliesKeys()) {
+      scopes1.put(fam, 0);
+    }
+    NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : t2.getFamiliesKeys()) {
+      scopes2.put(fam, 0);
+    }
+    try {
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      wal.rollWriter();
+      // add some more edits and roll the wal. This would reach the log number threshold
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      wal.rollWriter();
+      // with above rollWriter call, the max logs limit is reached.
+      assertTrue(wal.getNumRolledLogFiles() == 2);
+
+      // get the regions to flush; since there is only one region in the oldest wal, it should
+      // return only one region.
+      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(1, regionsToFlush.length);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // insert edits in second region
+      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+      // get the regions to flush, it should still read region1.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(regionsToFlush.length, 1);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
+      // remain.
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+      wal.rollWriter();
+      // only one wal should remain now (that is for the second region).
+      assertEquals(1, wal.getNumRolledLogFiles());
+      // flush the second region
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
+      wal.rollWriter(true);
+      // no wal should remain now.
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // add edits both to region 1 and region 2, and roll.
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+      wal.rollWriter();
+      // add edits and roll the writer, to reach the max logs limit.
+      assertEquals(1, wal.getNumRolledLogFiles());
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      wal.rollWriter();
+      // it should return two regions to flush, as the oldest wal file has entries
+      // for both regions.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(2, regionsToFlush.length);
+      // flush both regions
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
+      wal.rollWriter(true);
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // Add an edit to region1, and roll the wal.
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
+      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+      wal.rollWriter();
+      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+      assertEquals(1, wal.getNumRolledLogFiles());
+    } finally {
+      if (wal != null) {
+        wal.close();
+      }
+    }
+  }
+
+  @Test(expected = IOException.class)
+  public void testFailedToCreateWALIfParentRenamed() throws IOException {
+    final String name = "testFailedToCreateWALIfParentRenamed";
+    AbstractFSWAL<?> log = newWAL(FS, FSUtils.getRootDir(CONF), name,
+      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
+    long filenum = System.currentTimeMillis();
+    Path path = log.computeFilename(filenum);
+    log.createWriterInstance(path);
+    Path parent = path.getParent();
+    path = log.computeFilename(filenum + 1);
+    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
+    FS.rename(parent, newPath);
+    log.createWriterInstance(path);
+    fail("It should fail to create the new WAL");
+  }
+
+  /**
+   * Test flush for sure has a sequence id that is beyond the last edit appended. We do this by
+   * slowing appends in the background ring buffer thread while in foreground we call flush. The
+   * addition of the sync over HRegion in flush should fix an issue where flush was returning before
+   * all of its appends had made it out to the WAL (HBASE-11109).
+   * @throws IOException
+   * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
+   */
+  @Test
+  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
+    String testName = currentTest.getMethodName();
+    final TableName tableName = TableName.valueOf(testName);
+    final HRegionInfo hri = new HRegionInfo(tableName);
+    final byte[] rowName = tableName.getName();
+    final HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("f"));
+    HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
+      TEST_UTIL.getConfiguration(), htd);
+    HBaseTestingUtility.closeRegionAndWAL(r);
+    final int countPerFamily = 10;
+    final AtomicBoolean goslow = new AtomicBoolean(false);
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+    // subclass and doctor a method.
+    AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), testName, CONF,
+      null, true, null, null, new Runnable() {
+
+        @Override
+        public void run() {
+          if (goslow.get()) {
+            Threads.sleep(100);
+            LOG.debug("Sleeping before appending 100ms");
+          }
+        }
+      });
+    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
+      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+    try {
+      List<Put> puts = null;
+      for (HColumnDescriptor hcd : htd.getFamilies()) {
+        puts =
+            TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
+      }
+
+      // Now assert edits made it in.
+      final Get g = new Get(rowName);
+      Result result = region.get(g);
+      assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+
+      // Construct a WALEdit and add it a few times to the WAL.
+      WALEdit edits = new WALEdit();
+      for (Put p : puts) {
+        CellScanner cs = p.cellScanner();
+        while (cs.advance()) {
+          edits.add(cs.current());
+        }
+      }
+      // Add any old cluster id.
+      List<UUID> clusterIds = new ArrayList<UUID>();
+      clusterIds.add(UUID.randomUUID());
+      // Now make appends run slow.
+      goslow.set(true);
+      for (int i = 0; i < countPerFamily; i++) {
+        final HRegionInfo info = region.getRegionInfo();
+        final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
+            System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
+        wal.append(info, logkey, edits, true);
+      }
+      region.flush(true);
+      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
+      long currentSequenceId = region.getReadPoint(null);
+      // Now release the appends
+      goslow.set(false);
+      assertTrue(currentSequenceId >= region.getReadPoint(null));
+    } finally {
+      region.close(true);
+      wal.close();
+    }
+  }
+
+  @Test
+  public void testSyncNoAppend() throws IOException {
+    String testName = currentTest.getMethodName();
+    AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), testName, CONF,
+      null, true, null, null);
+    try {
+      wal.sync();
+    } finally {
+      wal.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
new file mode 100644
index 0000000..721ee85
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Provides AsyncFSWAL test cases.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWAL extends AbstractTestFSWAL {
+
+  private static EventLoopGroup GROUP;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    GROUP = new NioEventLoopGroup();
+    AbstractTestFSWAL.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    AbstractTestFSWAL.tearDownAfterClass();
+    GROUP.shutdownGracefully();
+  }
+
+  @Override
+  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+      String prefix, String suffix) throws IOException {
+    return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix, GROUP.next());
+  }
+
+  @Override
+  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix, final Runnable action)
+      throws IOException {
+    return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix, GROUP.next()) {
+
+      @Override
+      void atHeadOfRingBufferEventHandlerAppend() {
+        action.run();
+        super.atHeadOfRingBufferEventHandlerAppend();
+      }
+
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cfe363f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index c60b225..bf56afe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -18,430 +18,64 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
 
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdge;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
 
 /**
  * Provides FSHLog test cases.
  */
-@Category({RegionServerTests.class, MediumTests.class})
-public class TestFSHLog {
-  private static final Log LOG = LogFactory.getLog(TestFSHLog.class);
-
-  protected static Configuration conf;
-  protected static FileSystem fs;
-  protected static Path dir;
-  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  @Rule
-  public final TestName currentTest = new TestName();
-
-  @Before
-  public void setUp() throws Exception {
-    FileStatus[] entries = fs.listStatus(new Path("/"));
-    for (FileStatus dir : entries) {
-      fs.delete(dir.getPath(), true);
-    }
-    final Path hbaseDir = TEST_UTIL.createRootDir();
-    dir = new Path(hbaseDir, currentTest.getMethodName());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    // Make block sizes small.
-    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
-    // quicker heartbeat interval for faster DN death notification
-    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
-    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
-    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
-
-    // faster failover with cluster.shutdown();fs.close() idiom
-    TEST_UTIL.getConfiguration()
-        .setInt("hbase.ipc.client.connect.max.retries", 1);
-    TEST_UTIL.getConfiguration().setInt(
-        "dfs.client.block.recovery.retries", 1);
-    TEST_UTIL.getConfiguration().setInt(
-      "hbase.ipc.client.connection.maxidletime", 500);
-    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
-        SampleRegionWALObserver.class.getName());
-    TEST_UTIL.startMiniDFSCluster(3);
-
-    conf = TEST_UTIL.getConfiguration();
-    fs = TEST_UTIL.getDFSCluster().getFileSystem();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * A loaded WAL coprocessor won't break existing WAL test cases.
-   */
-  @Test
-  public void testWALCoprocessorLoaded() throws Exception {
-    // test to see whether the coprocessor is loaded or not.
-    FSHLog log = null;
-    try {
-      log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
-          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
-      WALCoprocessorHost host = log.getCoprocessorHost();
-      Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
-      assertNotNull(c);
-    } finally {
-      if (log != null) {
-        log.close();
-      }
-    }
-  }
-
-  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
-      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
-          throws IOException {
-    final byte[] row = Bytes.toBytes("row");
-    for (int i = 0; i < times; i++) {
-      long timestamp = System.currentTimeMillis();
-      WALEdit cols = new WALEdit();
-      cols.add(new KeyValue(row, row, row, timestamp, row));
-      WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
-          WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
-          HConstants.NO_NONCE, mvcc, scopes);
-      log.append(hri, key, cols, true);
-    }
-    log.sync();
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestFSHLog extends AbstractTestFSWAL {
+
+  @Override
+  protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+      String prefix, String suffix) throws IOException {
+    return new FSHLog(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix);
   }
 
-  /**
-   * helper method to simulate region flush for a WAL.
-   * @param wal
-   * @param regionEncodedName
-   */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
-    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
-    wal.completeCacheFlush(regionEncodedName);
-  }
+  @Override
+  protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix, final Runnable action)
+      throws IOException {
+    return new FSHLog(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix) {
 
-  /**
-   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
-   * exception if we do). Comparison is based on the timestamp present in the wal name.
-   * @throws Exception
-   */
-  @Test 
-  public void testWALComparator() throws Exception {
-    FSHLog wal1 = null;
-    FSHLog walMeta = null;
-    try {
-      wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
-          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
-      LOG.debug("Log obtained is: " + wal1);
-      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
-      Path p1 = wal1.computeFilename(11);
-      Path p2 = wal1.computeFilename(12);
-      // comparing with itself returns 0
-      assertTrue(comp.compare(p1, p1) == 0);
-      // comparing with different filenum.
-      assertTrue(comp.compare(p1, p2) < 0);
-      walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
-          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
-          DefaultWALProvider.META_WAL_PROVIDER_ID);
-      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
-
-      Path p1WithMeta = walMeta.computeFilename(11);
-      Path p2WithMeta = walMeta.computeFilename(12);
-      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
-      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
-      // mixing meta and non-meta logs gives error
-      boolean ex = false;
-      try {
-        comp.compare(p1WithMeta, p2);
-      } catch (IllegalArgumentException e) {
-        ex = true;
-      }
-      assertTrue("Comparator doesn't complain while checking meta log files", ex);
-      boolean exMeta = false;
-      try {
-        compMeta.compare(p1WithMeta, p2);
-      } catch (IllegalArgumentException e) {
-        exMeta = true;
-      }
-      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
-    } finally {
-      if (wal1 != null) {
-        wal1.close();
-      }
-      if (walMeta != null) {
-        walMeta.close();
-      }
-    }
-  }
-
-  /**
-   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
-   * list of regions which should be flushed in order to archive the oldest wal file.
-   * <p>
-   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
-   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
-   * rolling the wal.
-   * @throws Exception
-   */
-  @Test 
-  public void testFindMemStoresEligibleForFlush() throws Exception {
-    LOG.debug("testFindMemStoresEligibleForFlush");
-    Configuration conf1 = HBaseConfiguration.create(conf);
-    conf1.setInt("hbase.regionserver.maxlogs", 1);
-    FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
-        HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
-    HTableDescriptor t1 =
-        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
-    HTableDescriptor t2 =
-        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
-    HRegionInfo hri1 =
-        new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-    HRegionInfo hri2 =
-        new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-    // add edits and roll the wal
-    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-    NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
-        Bytes.BYTES_COMPARATOR);
-    for(byte[] fam : t1.getFamiliesKeys()) {
-      scopes1.put(fam, 0);
-    }
-    NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
-        Bytes.BYTES_COMPARATOR);
-    for(byte[] fam : t2.getFamiliesKeys()) {
-      scopes2.put(fam, 0);
-    }
-    try {
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      wal.rollWriter();
-      // add some more edits and roll the wal. This would reach the log number threshold
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      wal.rollWriter();
-      // with above rollWriter call, the max logs limit is reached.
-      assertTrue(wal.getNumRolledLogFiles() == 2);
-
-      // get the regions to flush; since there is only one region in the oldest wal, it should
-      // return only one region.
-      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(1, regionsToFlush.length);
-      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
-      // insert edits in second region
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
-      // get the regions to flush, it should still read region1.
-      regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(regionsToFlush.length, 1);
-      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
-      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
-      // remain.
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
-      wal.rollWriter();
-      // only one wal should remain now (that is for the second region).
-      assertEquals(1, wal.getNumRolledLogFiles());
-      // flush the second region
-      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
-      wal.rollWriter(true);
-      // no wal should remain now.
-      assertEquals(0, wal.getNumRolledLogFiles());
-      // add edits both to region 1 and region 2, and roll.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
-      wal.rollWriter();
-      // add edits and roll the writer, to reach the max logs limit.
-      assertEquals(1, wal.getNumRolledLogFiles());
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      wal.rollWriter();
-      // it should return two regions to flush, as the oldest wal file has entries
-      // for both regions.
-      regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(2, regionsToFlush.length);
-      // flush both regions
-      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
-      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
-      wal.rollWriter(true);
-      assertEquals(0, wal.getNumRolledLogFiles());
-      // Add an edit to region1, and roll the wal.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
-      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
-      wal.rollWriter();
-      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
-      assertEquals(1, wal.getNumRolledLogFiles());
-    } finally {
-      if (wal != null) {
-        wal.close();
-      }
-    }
-  }
-
-  @Test(expected=IOException.class)
-  public void testFailedToCreateWALIfParentRenamed() throws IOException {
-    final String name = "testFailedToCreateWALIfParentRenamed";
-    FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
-        conf, null, true, null, null);
-    long filenum = System.currentTimeMillis();
-    Path path = log.computeFilename(filenum);
-    log.createWriterInstance(path);
-    Path parent = path.getParent();
-    path = log.computeFilename(filenum + 1);
-    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
-    fs.rename(parent, newPath);
-    log.createWriterInstance(path);
-    fail("It should fail to create the new WAL");
-  }
-
-  /**
-   * Test flush for sure has a sequence id that is beyond the last edit appended.  We do this
-   * by slowing appends in the background ring buffer thread while in foreground we call
-   * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
-   * returning before all of its appends had made it out to the WAL (HBASE-11109).
-   * @throws IOException
-   * @see <a href="https://issues.apache.org/jira/browse/HBASE-11109">HBASE-11109</a>
-   */
-  @Test
-  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
-    String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
-    final TableName tableName = TableName.valueOf(testName);
-    final HRegionInfo hri = new HRegionInfo(tableName);
-    final byte[] rowName = tableName.getName();
-    final HTableDescriptor htd = new HTableDescriptor(tableName);
-    htd.addFamily(new HColumnDescriptor("f"));
-    HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
-        TEST_UTIL.getConfiguration(), htd);
-    HBaseTestingUtility.closeRegionAndWAL(r);
-    final int countPerFamily = 10;
-    final MutableBoolean goslow = new MutableBoolean(false);
-    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
-        Bytes.BYTES_COMPARATOR);
-    for(byte[] fam : htd.getFamiliesKeys()) {
-      scopes.put(fam, 0);
-    }
-    // subclass and doctor a method.
-    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
-        testName, conf) {
       @Override
       void atHeadOfRingBufferEventHandlerAppend() {
-        if (goslow.isTrue()) {
-          Threads.sleep(100);
-          LOG.debug("Sleeping before appending 100ms");
-        }
+        action.run();
         super.atHeadOfRingBufferEventHandlerAppend();
       }
     };
-    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
-      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
-    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
-    try {
-      List<Put> puts = null;
-      for (HColumnDescriptor hcd: htd.getFamilies()) {
-        puts =
-          TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
-      }
-
-      // Now assert edits made it in.
-      final Get g = new Get(rowName);
-      Result result = region.get(g);
-      assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
-
-      // Construct a WALEdit and add it a few times to the WAL.
-      WALEdit edits = new WALEdit();
-      for (Put p: puts) {
-        CellScanner cs = p.cellScanner();
-        while (cs.advance()) {
-          edits.add(cs.current());
-        }
-      }
-      // Add any old cluster id.
-      List<UUID> clusterIds = new ArrayList<UUID>();
-      clusterIds.add(UUID.randomUUID());
-      // Now make appends run slow.
-      goslow.setValue(true);
-      for (int i = 0; i < countPerFamily; i++) {
-        final HRegionInfo info = region.getRegionInfo();
-        final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
-            System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
-        wal.append(info, logkey, edits, true);
-        }
-      region.flush(true);
-      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
-      long currentSequenceId = region.getReadPoint(null);
-      // Now release the appends
-      goslow.setValue(false);
-      synchronized (goslow) {
-        goslow.notifyAll();
-      }
-      assertTrue(currentSequenceId >= region.getReadPoint(null));
-    } finally {
-      region.close(true);
-      wal.close();
-    }
   }
 
   @Test
   public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldException,
       SecurityException, IllegalArgumentException, IllegalAccessException {
     final String name = "testSyncRunnerIndexOverflow";
-    FSHLog log =
-        new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
-            null, true, null, null);
+    FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
+        CONF, null, true, null, null);
     try {
       Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
       ringBufferEventHandlerField.setAccessible(true);
@@ -453,9 +87,8 @@ public class TestFSHLog {
       syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
       HTableDescriptor htd =
           new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
-      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
-          Bytes.BYTES_COMPARATOR);
-      for(byte[] fam : htd.getFamiliesKeys()) {
+      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+      for (byte[] fam : htd.getFamiliesKeys()) {
         scopes.put(fam, 0);
       }
       HRegionInfo hri =


Mime
View raw message