hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [3/4] hbase git commit: HBASE-15813 Rename DefaultWALProvider to a more specific name and clean up unnecessary reference to it
Date Thu, 12 May 2016 03:36:22 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 59370bc..e3e62fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
@@ -60,8 +61,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -74,15 +73,17 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.util.Progressable;
 import org.junit.After;
 import org.junit.Assert;
@@ -170,7 +171,7 @@ public class TestStore {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
-    final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
+    final Path logdir = new Path(basedir, AbstractFSWALProvider.getWALDirectoryName(methodName));
 
     FileSystem fs = FileSystem.get(conf);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 214565e..9eaeda4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.junit.AfterClass;
@@ -194,7 +194,7 @@ public abstract class AbstractTestFSWAL {
       assertTrue(comp.compare(p1, p2) < 0);
       walMeta =
           newWAL(FS, FSUtils.getRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
-            CONF, null, true, null, DefaultWALProvider.META_WAL_PROVIDER_ID);
+            CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
       Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
 
       Path p1WithMeta = walMeta.computeFilename(11);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
new file mode 100644
index 0000000..f70bcc8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period"
+ */
+public abstract class AbstractTestLogRollPeriod {
+  private static final Log LOG = LogFactory.getLog(AbstractTestLogRollPeriod.class);
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private final static long LOG_ROLL_PERIOD = 4000;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // disable the ui
+    TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1);
+
+    TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Tests that the LogRoller perform the roll even if there are no edits
+   */
+  @Test
+  public void testNoEdits() throws Exception {
+    TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits");
+    TEST_UTIL.createTable(tableName, "cf");
+    try {
+      Table table = TEST_UTIL.getConnection().getTable(tableName);
+      try {
+        HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+        WAL log = server.getWAL(null);
+        checkMinLogRolls(log, 5);
+      } finally {
+        table.close();
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  /**
+   * Tests that the LogRoller perform the roll with some data in the log
+   */
+  @Test(timeout=60000)
+  public void testWithEdits() throws Exception {
+    final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits");
+    final String family = "cf";
+
+    TEST_UTIL.createTable(tableName, family);
+    try {
+      HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+      WAL log = server.getWAL(null);
+      final Table table = TEST_UTIL.getConnection().getTable(tableName);
+
+      Thread writerThread = new Thread("writer") {
+        @Override
+        public void run() {
+          try {
+            long row = 0;
+            while (!interrupted()) {
+              Put p = new Put(Bytes.toBytes(String.format("row%d", row)));
+              p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row));
+              table.put(p);
+              row++;
+
+              Thread.sleep(LOG_ROLL_PERIOD / 16);
+            }
+          } catch (Exception e) {
+            LOG.warn(e);
+          }
+        }
+      };
+
+      try {
+        writerThread.start();
+        checkMinLogRolls(log, 5);
+      } finally {
+        writerThread.interrupt();
+        writerThread.join();
+        table.close();
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  private void checkMinLogRolls(final WAL log, final int minRolls)
+      throws Exception {
+    final List<Path> paths = new ArrayList<Path>();
+    log.registerWALActionsListener(new WALActionsListener.Base() {
+      @Override
+      public void postLogRoll(Path oldFile, Path newFile) {
+        LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
+        paths.add(newFile);
+      }
+    });
+
+    // Sleep until we should get at least min-LogRoll events
+    long wtime = System.currentTimeMillis();
+    Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
+    // Do some extra sleep in case the machine is slow,
+    // and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
+    final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
+    for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
+      Thread.sleep(LOG_ROLL_PERIOD / 4);
+    }
+    wtime = System.currentTimeMillis() - wtime;
+    LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
+                           paths.size(), wtime, wtime / paths.size(), minRolls));
+    assertFalse(paths.size() < minRolls);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
index 7abdef9..046071f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -45,7 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -184,7 +185,7 @@ public abstract class AbstractTestLogRolling  {
     HRegionInfo region =
         server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
     final WAL log = server.getWAL(region);
-    LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
+    LOG.info("after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) +
         " log files");
 
       // flush all regions
@@ -195,7 +196,7 @@ public abstract class AbstractTestLogRolling  {
       // Now roll the log
       log.rollWriter();
 
-    int count = DefaultWALProvider.getNumRolledLogFiles(log);
+    int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
     LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
       assertTrue(("actual count: " + count), count <= 2);
   }
@@ -226,34 +227,6 @@ public abstract class AbstractTestLogRolling  {
     LOG.info("Validated row " + row);
   }
 
-  void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
-      throws IOException {
-    for (int i = 0; i < 10; i++) {
-      Put put = new Put(Bytes.toBytes("row"
-          + String.format("%1$04d", (start + i))));
-      put.addColumn(HConstants.CATALOG_FAMILY, null, value);
-      table.put(put);
-    }
-    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
-    tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value);
-    long startTime = System.currentTimeMillis();
-    long remaining = timeout;
-    while (remaining > 0) {
-      if (log.isLowReplicationRollEnabled() == expect) {
-        break;
-      } else {
-        // Trigger calling FSHlog#checkLowReplication()
-        table.put(tmpPut);
-        try {
-          Thread.sleep(200);
-        } catch (InterruptedException e) {
-          // continue
-        }
-        remaining = timeout - (System.currentTimeMillis() - startTime);
-      }
-    }
-  }
-
   /**
    * Tests that logs are deleted when some region has a compaction
    * record in WAL and no other records. See HBASE-8597.
@@ -282,13 +255,13 @@ public abstract class AbstractTestLogRolling  {
       }
       doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
       assertEquals("Should have no WAL after initial writes", 0,
-          DefaultWALProvider.getNumRolledLogFiles(log));
+        AbstractFSWALProvider.getNumRolledLogFiles(log));
       assertEquals(2, s.getStorefilesCount());
 
       // Roll the log and compact table, to have compaction record in the 2nd WAL.
       log.rollWriter();
       assertEquals("Should have WAL; one table is not flushed", 1,
-          DefaultWALProvider.getNumRolledLogFiles(log));
+        AbstractFSWALProvider.getNumRolledLogFiles(log));
       admin.flush(table.getName());
       region.compact(false);
       // Wait for compaction in case if flush triggered it before us.
@@ -302,14 +275,14 @@ public abstract class AbstractTestLogRolling  {
       doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
       log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
       assertEquals("Should have WAL; one table is not flushed", 1,
-          DefaultWALProvider.getNumRolledLogFiles(log));
+        AbstractFSWALProvider.getNumRolledLogFiles(log));
 
       // Flush table to make latest WAL obsolete; write another record, and roll again.
       admin.flush(table.getName());
       doPut(table, 1);
       log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
       assertEquals("Should have 1 WALs at the end", 1,
-          DefaultWALProvider.getNumRolledLogFiles(log));
+        AbstractFSWALProvider.getNumRolledLogFiles(log));
     } finally {
       if (t != null) t.close();
       if (table != null) table.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
new file mode 100644
index 0000000..a974639
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -0,0 +1,1270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test replay of edits out of a WAL split.
+ */
+public abstract class AbstractTestWALReplay {
+  private static final Log LOG = LogFactory.getLog(AbstractTestWALReplay.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+  private Path hbaseRootDir = null;
+  private String logName;
+  private Path oldLogDir;
+  private Path logDir;
+  private FileSystem fs;
+  private Configuration conf;
+  private RecoveryMode mode;
+  private WALFactory wals;
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("dfs.support.append", true);
+    // The below config supported by 0.20-append and CDH3b2
+    conf.setInt("dfs.client.block.recovery.retries", 2);
+    TEST_UTIL.startMiniCluster(3);
+    Path hbaseRootDir =
+      TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
+    LOG.info("hbase.rootdir=" + hbaseRootDir);
+    FSUtils.setRootDir(conf, hbaseRootDir);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    this.hbaseRootDir = FSUtils.getRootDir(this.conf);
+    this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    this.logName = AbstractFSWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
+    this.logDir = new Path(this.hbaseRootDir, logName);
+    if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
+      TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+    }
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+    this.wals = new WALFactory(conf, null, currentTest.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.wals.close();
+    TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+  }
+
+  /*
+   * @param p Directory to cleanup
+   */
+  private void deleteDir(final Path p) throws IOException {
+    if (this.fs.exists(p)) {
+      if (!this.fs.delete(p, true)) {
+        throw new IOException("Failed remove of " + p);
+      }
+    }
+  }
+
+  /**
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
+    final TableName tableName =
+        TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
+    byte[] family1 = Bytes.toBytes("cf1");
+    byte[] family2 = Bytes.toBytes("cf2");
+    byte[] qualifier = Bytes.toBytes("q");
+    byte[] value = Bytes.toBytes("testV");
+    byte[][] familys = { family1, family2 };
+    TEST_UTIL.createTable(tableName, familys);
+    Table htable = TEST_UTIL.getConnection().getTable(tableName);
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.addColumn(family1, qualifier, value);
+    htable.put(put);
+    ResultScanner resultScanner = htable.getScanner(new Scan());
+    int count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(1, count);
+
+    MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
+    List<HRegion> regions = hbaseCluster.getRegions(tableName);
+    assertEquals(1, regions.size());
+
+    // move region to another regionserver
+    Region destRegion = regions.get(0);
+    int originServerNum = hbaseCluster
+        .getServerWith(destRegion.getRegionInfo().getRegionName());
+    assertTrue("Please start more than 1 regionserver", hbaseCluster
+        .getRegionServerThreads().size() > 1);
+    int destServerNum = 0;
+    while (destServerNum == originServerNum) {
+      destServerNum++;
+    }
+    HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
+    HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
+    // move region to destination regionserver
+    moveRegionAndWait(destRegion, destServer);
+
+    // delete the row
+    Delete del = new Delete(Bytes.toBytes("r1"));
+    htable.delete(del);
+    resultScanner = htable.getScanner(new Scan());
+    count = 0;
+    while (resultScanner.next() != null) {
+      count++;
+    }
+    resultScanner.close();
+    assertEquals(0, count);
+
+    // flush region and make major compaction
+    Region region =  destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
+    region.flush(true);
+    // wait to complete major compaction
+    for (Store store : region.getStores()) {
+      store.triggerMajorCompaction();
+    }
+    region.compact(true);
+
+    // move region to origin regionserver
+    moveRegionAndWait(destRegion, originServer);
+    // abort the origin regionserver
+    originServer.abort("testing");
+
+    // see what we get
+    Result result = htable.get(new Get(Bytes.toBytes("r1")));
+    if (result != null) {
+      assertTrue("Row is deleted, but we get" + result.toString(),
+          (result == null) || result.isEmpty());
+    }
+    resultScanner.close();
+  }
+
+  private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
+      throws InterruptedException, MasterNotRunningException,
+      ZooKeeperConnectionException, IOException {
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    TEST_UTIL.getHBaseAdmin().move(
+        destRegion.getRegionInfo().getEncodedNameAsBytes(),
+        Bytes.toBytes(destServer.getServerName().getServerName()));
+    while (true) {
+      ServerName serverName = master.getAssignmentManager()
+        .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
+      if (serverName != null && serverName.equals(destServer.getServerName())) {
+        TEST_UTIL.assertRegionOnServer(
+          destRegion.getRegionInfo(), serverName, 200);
+        break;
+      }
+      Thread.sleep(10);
+    }
+  }
+
+  /**
+   * Tests for hbase-2727.
+   * @throws Exception
+   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2727">HBASE-2727</a>
+   */
+  @Test
+  public void test2727() throws Exception {
+    // Test being able to have > 1 set of edits in the recovered.edits directory.
+    // Ensure edits are replayed properly.
+    final TableName tableName =
+        TableName.valueOf("test2727");
+
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
+    deleteDir(basedir);
+
+    HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region2);
+    final byte [] rowName = tableName.getName();
+
+    WAL wal1 = createWAL(this.conf, hbaseRootDir, logName);
+    // Add 1k to each family.
+    final int countPerFamily = 1000;
+
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for(byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
+          wal1, htd, mvcc, scopes);
+    }
+    wal1.shutdown();
+    runWALSplit(this.conf);
+
+    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
+    // Add 1k to each family.
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
+          ee, wal2, htd, mvcc, scopes);
+    }
+    wal2.shutdown();
+    runWALSplit(this.conf);
+
+    WAL wal3 = createWAL(this.conf, hbaseRootDir, logName);
+    try {
+      HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
+      long seqid = region.getOpenSeqNum();
+      // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
+      // When opened, this region would apply 6k edits, and increment the sequenceId by 1
+      assertTrue(seqid > mvcc.getWritePoint());
+      assertEquals(seqid - 1, mvcc.getWritePoint());
+      LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
+          + mvcc.getReadPoint());
+
+      // TODO: Scan all.
+      region.close();
+    } finally {
+      wal3.close();
+    }
+  }
+
+  /**
+   * Test case of HRegion that is only made out of bulk loaded files.  Assert
+   * that we don't 'crash'.
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws NoSuchFieldException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   */
+  @Test
+  public void testRegionMadeOfBulkLoadedFilesOnly()
+  throws IOException, SecurityException, IllegalArgumentException,
+      NoSuchFieldException, IllegalAccessException, InterruptedException {
+    final TableName tableName =
+        TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
+    deleteDir(basedir);
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    Region region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region2);
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    Region region = HRegion.openHRegion(hri, htd, wal, this.conf);
+
+    byte [] family = htd.getFamilies().iterator().next().getName();
+    Path f =  new Path(basedir, "hfile");
+    HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
+        Bytes.toBytes("z"), 10);
+    List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
+    hfs.add(Pair.newPair(family, f.toString()));
+    region.bulkLoadHFiles(hfs, true, null);
+
+    // Add an edit so something in the WAL
+    byte [] row = tableName.getName();
+    region.put((new Put(row)).addColumn(family, family, family));
+    wal.sync();
+    final int rowsInsertedCount = 11;
+
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+    // Now 'crash' the region by stealing its wal
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+        tableName.getNameAsString());
+    user.runAs(new PrivilegedExceptionAction() {
+      @Override
+      public Object run() throws Exception {
+        runWALSplit(newConf);
+        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
+
+        HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
+          hbaseRootDir, hri, htd, wal2);
+        long seqid2 = region2.getOpenSeqNum();
+        assertTrue(seqid2 > -1);
+        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
+
+        // I can't close wal1.  Its been appropriated when we split.
+        region2.close();
+        wal2.close();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
+   * files) and an edit in the memstore.
+   * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
+   * from being replayed"
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws NoSuchFieldException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   */
+  @Test
+  public void testCompactedBulkLoadedFiles()
+      throws IOException, SecurityException, IllegalArgumentException,
+      NoSuchFieldException, IllegalAccessException, InterruptedException {
+    final TableName tableName =
+        TableName.valueOf("testCompactedBulkLoadedFiles");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
+    deleteDir(basedir);
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region2);
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
+
+    // Add an edit so something in the WAL
+    byte [] row = tableName.getName();
+    byte [] family = htd.getFamilies().iterator().next().getName();
+    region.put((new Put(row)).addColumn(family, family, family));
+    wal.sync();
+
+    List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
+    for (int i = 0; i < 3; i++) {
+      Path f = new Path(basedir, "hfile"+i);
+      HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
+          Bytes.toBytes(i + "50"), 10);
+      hfs.add(Pair.newPair(family, f.toString()));
+    }
+    region.bulkLoadHFiles(hfs, true, null);
+    final int rowsInsertedCount = 31;
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+    // major compact to turn all the bulk loaded files into one normal file
+    region.compact(true);
+    assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+    // Now 'crash' the region by stealing its wal
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+        tableName.getNameAsString());
+    user.runAs(new PrivilegedExceptionAction() {
+      @Override
+      public Object run() throws Exception {
+        runWALSplit(newConf);
+        WAL wal2 = createWAL(newConf, hbaseRootDir, logName);
+
+        HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
+            hbaseRootDir, hri, htd, wal2);
+        long seqid2 = region2.getOpenSeqNum();
+        assertTrue(seqid2 > -1);
+        assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
+
+        // I can't close wal1.  Its been appropriated when we split.
+        region2.close();
+        wal2.close();
+        return null;
+      }
+    });
+  }
+
+
+  /**
+   * Test writing edits into an HRegion, closing it, splitting logs, opening
+   * Region again.  Verify seqids.
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws NoSuchFieldException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   */
+  @Test
+  public void testReplayEditsWrittenViaHRegion()
+  throws IOException, SecurityException, IllegalArgumentException,
+      NoSuchFieldException, IllegalAccessException, InterruptedException {
+    final TableName tableName =
+        TableName.valueOf("testReplayEditsWrittenViaHRegion");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families.  Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+    long seqid = region.getOpenSeqNum();
+    boolean first = true;
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+      if (first) {
+        // If first, so we have at least one family w/ different seqid to rest.
+        region.flush(true);
+        first = false;
+      }
+    }
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * htd.getFamilies().size(),
+      result.size());
+    // Now close the region (without flush), split the log, reopen the region and assert that
+    // replay of log has the correct effect, that our seqids are calculated correctly so
+    // all edits in logs are seen as 'stale'/old.
+    region.close(true);
+    wal.shutdown();
+    runWALSplit(this.conf);
+    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
+    long seqid2 = region2.getOpenSeqNum();
+    assertTrue(seqid + result.size() < seqid2);
+    final Result result1b = region2.get(g);
+    assertEquals(result.size(), result1b.size());
+
+    // Next test.  Add more edits, then 'crash' this region by stealing its wal
+    // out from under it and assert that replay of the log adds the edits back
+    // correctly when region is opened again.
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
+    }
+    // Get count of edits.
+    final Result result2 = region2.get(g);
+    assertEquals(2 * result.size(), result2.size());
+    wal2.sync();
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+      tableName.getNameAsString());
+    user.runAs(new PrivilegedExceptionAction() {
+      @Override
+      public Object run() throws Exception {
+        runWALSplit(newConf);
+        FileSystem newFS = FileSystem.get(newConf);
+        // Make a new wal for new region open.
+        WAL wal3 = createWAL(newConf, hbaseRootDir, logName);
+        final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
+        HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
+          @Override
+          protected boolean restoreEdit(Store s, Cell cell) {
+            boolean b = super.restoreEdit(s, cell);
+            countOfRestoredEdits.incrementAndGet();
+            return b;
+          }
+        };
+        long seqid3 = region3.initialize();
+        Result result3 = region3.get(g);
+        // Assert that count of cells is same as before crash.
+        assertEquals(result2.size(), result3.size());
+        assertEquals(htd.getFamilies().size() * countPerFamily,
+          countOfRestoredEdits.get());
+
+        // I can't close wal1.  Its been appropriated when we split.
+        region3.close();
+        wal3.close();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Test that we recover correctly when there is a failure in between the
+   * flushes. i.e. Some stores got flushed but others did not.
+   *
+   * Unfortunately, there is no easy hook to flush at a store level. The way
+   * we get around this is by flushing at the region level, and then deleting
+   * the recently flushed store file for one of the Stores. This would put us
+   * back in the situation where all but that store got flushed and the region
+   * died.
+   *
+   * We restart Region again, and verify that the edits were replayed.
+   *
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws NoSuchFieldException
+   * @throws IllegalArgumentException
+   * @throws SecurityException
+   */
+  @Test
+  public void testReplayEditsAfterPartialFlush()
+  throws IOException, SecurityException, IllegalArgumentException,
+      NoSuchFieldException, IllegalAccessException, InterruptedException {
+    final TableName tableName =
+        TableName.valueOf("testReplayEditsWrittenViaHRegion");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families.  Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+    long seqid = region.getOpenSeqNum();
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+    }
+
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * htd.getFamilies().size(),
+      result.size());
+
+    // Let us flush the region
+    region.flush(true);
+    region.close(true);
+    wal.shutdown();
+
+    // delete the store files in the second column family to simulate a failure
+    // in between the flushcache();
+    // we have 3 families. killing the middle one ensures that taking the maximum
+    // will make us fail.
+    int cf_count = 0;
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      cf_count++;
+      if (cf_count == 2) {
+        region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
+      }
+    }
+
+
+    // Let us try to split and recover
+    runWALSplit(this.conf);
+    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
+    long seqid2 = region2.getOpenSeqNum();
+    assertTrue(seqid + result.size() < seqid2);
+
+    final Result result1b = region2.get(g);
+    assertEquals(result.size(), result1b.size());
+  }
+
+
+  // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
+  // Only throws exception if throwExceptionWhenFlushing is set true.
+  public static class CustomStoreFlusher extends DefaultStoreFlusher {
+    // Switch between throw and not throw exception in flush
+    static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
+
+    public CustomStoreFlusher(Configuration conf, Store store) {
+      super(conf, store);
+    }
+    @Override
+    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+        MonitoredTask status, ThroughputController throughputController) throws IOException {
+      if (throwExceptionWhenFlushing.get()) {
+        throw new IOException("Simulated exception by tests");
+      }
+      return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController);
+    }
+
+  };
+
+  /**
+   * Test that we could recover the data correctly after aborting flush. In the
+   * test, first we abort flush after writing some data, then writing more data
+   * and flush again, at last verify the data.
+   * @throws IOException
+   */
+  @Test
+  public void testReplayEditsAfterAbortingFlush() throws IOException {
+    final TableName tableName =
+        TableName.valueOf("testReplayEditsAfterAbortingFlush");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region3);
+    // Write countPerFamily edits into the three families. Do a flush on one
+    // of the families during the load of edits so its seqid is not same as
+    // others to test we do right thing when different seqids.
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
+    Configuration customConf = new Configuration(this.conf);
+    customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+        CustomStoreFlusher.class.getName());
+    HRegion region =
+      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
+    int writtenRowCount = 10;
+    List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
+        htd.getFamilies());
+    for (int i = 0; i < writtenRowCount; i++) {
+      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+
+    // Now assert edits made it in.
+    RegionScanner scanner = region.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+
+    // Let us flush the region
+    CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
+    try {
+      region.flush(true);
+      fail("Injected exception hasn't been thrown");
+    } catch (Throwable t) {
+      LOG.info("Expected simulated exception when flushing region,"
+          + t.getMessage());
+      // simulated to abort server
+      Mockito.doReturn(true).when(rsServices).isAborted();
+      region.setClosing(false); // region normally does not accept writes after
+      // DroppedSnapshotException. We mock around it for this test.
+    }
+    // writing more data
+    int moreRow = 10;
+    for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
+      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
+          Bytes.toBytes("val"));
+      region.put(put);
+    }
+    writtenRowCount += moreRow;
+    // call flush again
+    CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
+    try {
+      region.flush(true);
+    } catch (IOException t) {
+      LOG.info("Expected exception when flushing region because server is stopped,"
+          + t.getMessage());
+    }
+
+    region.close(true);
+    wal.shutdown();
+
+    // Let us try to split and recover
+    runWALSplit(this.conf);
+    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
+    Mockito.doReturn(false).when(rsServices).isAborted();
+    HRegion region2 =
+      HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
+    scanner = region2.getScanner(new Scan());
+    assertEquals(writtenRowCount, getScannedCount(scanner));
+  }
+
+  private int getScannedCount(RegionScanner scanner) throws IOException {
+    int scannedCount = 0;
+    List<Cell> results = new ArrayList<Cell>();
+    while (true) {
+      boolean existMore = scanner.next(results);
+      if (!results.isEmpty())
+        scannedCount++;
+      if (!existMore)
+        break;
+      results.clear();
+    }
+    return scannedCount;
+  }
+
+  /**
+   * Create an HRegion with the result of a WAL split and test we only see the
+   * good edits
+   * @throws Exception
+   */
+  @Test
+  public void testReplayEditsWrittenIntoWAL() throws Exception {
+    final TableName tableName =
+        TableName.valueOf("testReplayEditsWrittenIntoWAL");
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
+    deleteDir(basedir);
+
+    final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+    HRegion region2 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region2);
+    final WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    final byte[] rowName = tableName.getName();
+    final byte[] regionName = hri.getEncodedNameAsBytes();
+
+    // Add 1k to each family.
+    final int countPerFamily = 1000;
+    Set<byte[]> familyNames = new HashSet<byte[]>();
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for(byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+    for (HColumnDescriptor hcd: htd.getFamilies()) {
+      addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
+          ee, wal, htd, mvcc, scopes);
+      familyNames.add(hcd.getName());
+    }
+
+    // Add a cache flush, shouldn't have any effect
+    wal.startCacheFlush(regionName, familyNames);
+    wal.completeCacheFlush(regionName);
+
+    // Add an edit to another family, should be skipped.
+    WALEdit edit = new WALEdit();
+    long now = ee.currentTime();
+    edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
+      now, rowName));
+    wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+        true);
+
+    // Delete the c family to verify deletes make it over.
+    edit = new WALEdit();
+    now = ee.currentTime();
+    edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
+    wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+        true);
+
+    // Sync.
+    wal.sync();
+    // Make a new conf and a new fs for the splitter to run on so we can take
+    // over old wal.
+    final Configuration newConf = HBaseConfiguration.create(this.conf);
+    User user = HBaseTestingUtility.getDifferentUser(newConf,
+      ".replay.wal.secondtime");
+    user.runAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        runWALSplit(newConf);
+        FileSystem newFS = FileSystem.get(newConf);
+        // 100k seems to make for about 4 flushes during HRegion#initialize.
+        newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
+        // Make a new wal for new region.
+        WAL newWal = createWAL(newConf, hbaseRootDir, logName);
+        final AtomicInteger flushcount = new AtomicInteger(0);
+        try {
+          final HRegion region =
+              new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
+            @Override
+            protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+                final Collection<Store> storesToFlush, MonitoredTask status,
+                boolean writeFlushWalMarker)
+                    throws IOException {
+              LOG.info("InternalFlushCache Invoked");
+              FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
+                  Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
+              flushcount.incrementAndGet();
+              return fs;
+            }
+          };
+          // The seq id this region has opened up with
+          long seqid = region.initialize();
+
+          // The mvcc readpoint of from inserting data.
+          long writePoint = mvcc.getWritePoint();
+
+          // We flushed during init.
+          assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
+          assertTrue((seqid - 1) == writePoint);
+
+          Get get = new Get(rowName);
+          Result result = region.get(get);
+          // Make sure we only see the good edits
+          assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
+            result.size());
+          region.close();
+        } finally {
+          newWal.close();
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
+  // the following test is for HBASE-6065
+  public void testSequentialEditLogSeqNum() throws IOException {
+    final TableName tableName = TableName.valueOf(currentTest.getMethodName());
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir =
+        FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+
+    // Mock the WAL
+    MockWAL wal = createMockWAL();
+
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+    }
+
+    // Let us flush the region
+    // But this time completeflushcache is not yet done
+    region.flush(true);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
+    }
+    long lastestSeqNumber = region.getReadPoint(null);
+    // get the current seq no
+    wal.doCompleteCacheFlush = true;
+    // allow complete cache flush with the previous seq number got after first
+    // set of edits.
+    wal.completeCacheFlush(hri.getEncodedNameAsBytes());
+    wal.shutdown();
+    FileStatus[] listStatus = wal.getFiles();
+    assertNotNull(listStatus);
+    assertTrue(listStatus.length > 0);
+    WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
+        this.fs, this.conf, null, null, null, mode, wals);
+    FileStatus[] listStatus1 = this.fs.listStatus(
+      new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
+          "recovered.edits")), new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          if (WALSplitter.isSequenceIdFile(p)) {
+            return false;
+          }
+          return true;
+        }
+      });
+    int editCount = 0;
+    for (FileStatus fileStatus : listStatus1) {
+      editCount = Integer.parseInt(fileStatus.getPath().getName());
+    }
+    // The sequence number should be same
+    assertEquals(
+        "The sequence number of the recoverd.edits and the current edit seq should be same",
+        lastestSeqNumber, editCount);
+  }
+
+  /**
+   * testcase for https://issues.apache.org/jira/browse/HBASE-15252
+   */
+  @Test
+  public void testDatalossWhenInputError() throws IOException, InstantiationException,
+      IllegalAccessException {
+    final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+    deleteDir(basedir);
+    final byte[] rowName = tableName.getName();
+    final int countPerFamily = 10;
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+    HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    Path regionDir = region1.getRegionFileSystem().getRegionDir();
+    HBaseTestingUtility.closeRegionAndWAL(region1);
+
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+    }
+    // Now assert edits made it in.
+    final Get g = new Get(rowName);
+    Result result = region.get(g);
+    assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+    // Now close the region (without flush), split the log, reopen the region and assert that
+    // replay of log has the correct effect.
+    region.close(true);
+    wal.shutdown();
+
+    runWALSplit(this.conf);
+
+    // here we let the DFSInputStream throw an IOException just after the WALHeader.
+    Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
+    FSDataInputStream stream = fs.open(editFile);
+    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
+    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
+        conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+          AbstractFSWALProvider.Reader.class);
+    AbstractFSWALProvider.Reader reader = logReaderClass.newInstance();
+    reader.init(this.fs, editFile, conf, stream);
+    final long headerLength = stream.getPos();
+    reader.close();
+    FileSystem spyFs = spy(this.fs);
+    doAnswer(new Answer<FSDataInputStream>() {
+
+      @Override
+      public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+        FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
+        Field field = FilterInputStream.class.getDeclaredField("in");
+        field.setAccessible(true);
+        final DFSInputStream in = (DFSInputStream) field.get(stream);
+        DFSInputStream spyIn = spy(in);
+        doAnswer(new Answer<Integer>() {
+
+          private long pos;
+
+          @Override
+          public Integer answer(InvocationOnMock invocation) throws Throwable {
+            if (pos >= headerLength) {
+              throw new IOException("read over limit");
+            }
+            int b = (Integer) invocation.callRealMethod();
+            if (b > 0) {
+              pos += b;
+            }
+            return b;
+          }
+        }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
+        doAnswer(new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            invocation.callRealMethod();
+            in.close();
+            return null;
+          }
+        }).when(spyIn).close();
+        field.set(stream, spyIn);
+        return stream;
+      }
+    }).when(spyFs).open(eq(editFile));
+
+    WAL wal2 = createWAL(this.conf, hbaseRootDir, logName);
+    HRegion region2;
+    try {
+      // log replay should fail due to the IOException, otherwise we may lose data.
+      region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
+      assertEquals(result.size(), region2.get(g).size());
+    } catch (IOException e) {
+      assertEquals("read over limit", e.getMessage());
+    }
+    region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
+    assertEquals(result.size(), region2.get(g).size());
+  }
+
+  /**
+   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
+   */
+  private void testNameConflictWhenSplit(boolean largeFirst) throws IOException {
+    final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
+    deleteDir(basedir);
+
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+    final byte[] family = htd.getColumnFamilies()[0].getName();
+    final byte[] rowName = tableName.getName();
+    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
+    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
+
+    Path largeFile = new Path(logDir, "wal-1");
+    Path smallFile = new Path(logDir, "wal-2");
+    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
+    writerWALFile(smallFile, Arrays.asList(entry2));
+    FileStatus first, second;
+    if (largeFirst) {
+      first = fs.getFileStatus(largeFile);
+      second = fs.getFileStatus(smallFile);
+    } else {
+      first = fs.getFileStatus(smallFile);
+      second = fs.getFileStatus(largeFile);
+    }
+    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
+      RecoveryMode.LOG_SPLITTING, wals);
+    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
+      RecoveryMode.LOG_SPLITTING, wals);
+    WAL wal = createWAL(this.conf, hbaseRootDir, logName);
+    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
+    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
+    assertEquals(2, region.get(new Get(rowName)).size());
+  }
+
+  @Test
+  public void testNameConflictWhenSplit0() throws IOException {
+    testNameConflictWhenSplit(true);
+  }
+
+  @Test
+  public void testNameConflictWhenSplit1() throws IOException {
+    testNameConflictWhenSplit(false);
+  }
+
+  static class MockWAL extends FSHLog {
+    boolean doCompleteCacheFlush = false;
+
+    public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
+        throws IOException {
+      super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+    }
+
+    @Override
+    public void completeCacheFlush(byte[] encodedRegionName) {
+      if (!doCompleteCacheFlush) {
+        return;
+      }
+      super.completeCacheFlush(encodedRegionName);
+    }
+  }
+
+  private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
+    htd.addFamily(a);
+    return htd;
+  }
+
+  private MockWAL createMockWAL() throws IOException {
+    MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
+    // Set down maximum recovery so we dfsclient doesn't linger retrying something
+    // long gone.
+    HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
+    return wal;
+  }
+
+  // Flusher used in this test.  Keep count of how often we are called and
+  // actually run the flush inside here.
+  class TestFlusher implements FlushRequester {
+    private HRegion r;
+
+    @Override
+    public void requestFlush(Region region, boolean force) {
+      try {
+        r.flush(force);
+      } catch (IOException e) {
+        throw new RuntimeException("Exception flushing", e);
+      }
+    }
+
+    @Override
+    public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void registerFlushRequestListener(FlushRequestListener listener) {
+
+    }
+
+    @Override
+    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
+      return false;
+    }
+
+    @Override
+    public void setGlobalMemstoreLimit(long globalMemStoreSize) {
+
+    }
+  }
+
+  private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
+      final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
+    return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
+  }
+
+  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
+      int index) {
+    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
+    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
+    return edit;
+  }
+
+  private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
+      byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
+      int index, NavigableMap<byte[], Integer> scopes) throws IOException {
+    FSWALEntry entry =
+        new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
+          rowName, family, ee, index), hri, true);
+    entry.stampRegionSequenceId();
+    return entry;
+  }
+
+  private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
+      final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
+      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
+      NavigableMap<byte[], Integer> scopes) throws IOException {
+    for (int j = 0; j < count; j++) {
+      wal.append(hri, createWALKey(tableName, hri, mvcc, scopes),
+        createWALEdit(rowName, family, ee, j), true);
+    }
+    wal.sync();
+  }
+
+  static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
+      EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
+    List<Put> puts = new ArrayList<Put>();
+    for (int j = 0; j < count; j++) {
+      byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
+      Put p = new Put(rowName);
+      p.addColumn(family, qualifier, ee.currentTime(), rowName);
+      r.put(p);
+      puts.add(p);
+    }
+    return puts;
+  }
+
+  /*
+   * Creates an HRI around an HTD that has <code>tableName</code> and three
+   * column families named 'a','b', and 'c'.
+   * @param tableName Name of table to use when we create HTableDescriptor.
+   */
+   private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
+    return new HRegionInfo(tableName, null, null, false);
+   }
+
+  /*
+   * Run the split.  Verify only single split file made.
+   * @param c
+   * @return The single split file made
+   * @throws IOException
+   */
+  private Path runWALSplit(final Configuration c) throws IOException {
+    List<Path> splits = WALSplitter.split(
+      hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
+    // Split should generate only 1 file since there's only 1 region
+    assertEquals("splits=" + splits, 1, splits.size());
+    // Make sure the file exists
+    assertTrue(fs.exists(splits.get(0)));
+    LOG.info("Split file=" + splits.get(0));
+    return splits.get(0);
+  }
+
+  private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
+    htd.addFamily(a);
+    HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
+    htd.addFamily(b);
+    HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
+    htd.addFamily(c);
+    return htd;
+  }
+
+  private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException {
+    fs.mkdirs(file.getParent());
+    ProtobufLogWriter writer = new ProtobufLogWriter();
+    writer.init(fs, file, conf, true);
+    for (FSWALEntry entry : entries) {
+      writer.append(entry);
+    }
+    writer.sync();
+    writer.close();
+  }
+
+  protected abstract WAL createWAL(Configuration c, Path hbaseRootDir, String logName)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 41a790e..101758e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -54,7 +54,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
  * Hadoop serialization).
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class SequenceFileLogWriter implements DefaultWALProvider.Writer {
+public class SequenceFileLogWriter implements FSHLogProvider.Writer {
   private static final Log LOG = LogFactory.getLog(SequenceFileLogWriter.class);
   // The sequence file we delegate to.
   private SequenceFile.Writer writer;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java
index bedb915..af022fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java
@@ -25,12 +25,12 @@ import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
 @Category({ RegionServerTests.class, MediumTests.class })
-public class TestAsyncLogRollPeriod extends TestLogRollPeriod {
+public class TestAsyncLogRollPeriod extends AbstractTestLogRollPeriod {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TestLogRollPeriod.TEST_UTIL.getConfiguration();
+    Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration();
     conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
-    TestLogRollPeriod.setUpBeforeClass();
+    AbstractTestLogRollPeriod.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
index ca415fd..7f0c035 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
@@ -17,20 +17,45 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
 @Category({ RegionServerTests.class, MediumTests.class })
-public class TestAsyncWALReplay extends TestWALReplay {
+public class TestAsyncWALReplay extends AbstractTestWALReplay {
+
+  private static EventLoopGroup GROUP;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
+    GROUP = new NioEventLoopGroup();
+    Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
     conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
-    TestWALReplay.setUpBeforeClass();
+    AbstractTestWALReplay.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    AbstractTestWALReplay.tearDownAfterClass();
+    GROUP.shutdownGracefully();
+  }
+
+  @Override
+  protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
+    return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
+        HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java
index 3b8869b..c088891 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java
@@ -21,18 +21,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
 @Category({ RegionServerTests.class, MediumTests.class })
-public class TestAsyncWALReplayCompressed extends TestWALReplay {
+public class TestAsyncWALReplayCompressed extends TestAsyncWALReplay {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
-    conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
+    Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
     conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-    TestWALReplay.setUpBeforeClass();
+    TestAsyncWALReplay.setUpBeforeClass();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index 5783106..d6d4404 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -251,7 +251,7 @@ public class TestDurability {
   }
 
   private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
-    Path walPath = DefaultWALProvider.getCurrentFileName(log);
+    Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
     WAL.Reader reader = wals.createReader(FS, walPath);
     int count = 0;
     WAL.Entry entry = new WAL.Entry();

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
index b7c1c73..788828a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
@@ -17,18 +17,13 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.junit.Assert;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,16 +39,22 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -96,6 +97,7 @@ public class TestLogRollAbort {
     // the namenode might still try to choose the recently-dead datanode
     // for a pipeline, so try to a new pipeline multiple times
     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
+    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
   }
 
   private Configuration conf;
@@ -183,7 +185,7 @@ public class TestLogRollAbort {
   public void testLogRollAfterSplitStart() throws IOException {
     LOG.info("Verify wal roll after split starts will fail.");
     String logName = "testLogRollAfterSplitStart";
-    Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName));
+    Path thisTestsDir = new Path(HBASEDIR, AbstractFSWALProvider.getWALDirectoryName(logName));
     final WALFactory wals = new WALFactory(conf, null, logName);
 
     try {
@@ -218,7 +220,7 @@ public class TestLogRollAbort {
        * handles RS shutdowns (as observed by the splitting process)
        */
       // rename the directory so a rogue RS doesn't create more WALs
-      Path rsSplitDir = thisTestsDir.suffix(DefaultWALProvider.SPLITTING_EXT);
+      Path rsSplitDir = thisTestsDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
       if (!fs.rename(thisTestsDir, rsSplitDir)) {
         throw new IOException("Failed fs.rename for log split: " + thisTestsDir);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1267f76e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java
index 1141871..2449942 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java
@@ -17,144 +17,20 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.junit.AfterClass;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-/**
- * Tests that verifies that the log is forced to be rolled every "hbase.regionserver.logroll.period"
- */
-@Category({RegionServerTests.class, MediumTests.class})
-public class TestLogRollPeriod {
-  private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class);
-
-  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  private final static long LOG_ROLL_PERIOD = 4000;
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestLogRollPeriod extends AbstractTestLogRollPeriod {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    // disable the ui
-    TEST_UTIL.getConfiguration().setInt("hbase.regionsever.info.port", -1);
-
-    TEST_UTIL.getConfiguration().setLong("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
-
-    TEST_UTIL.startMiniCluster();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * Tests that the LogRoller perform the roll even if there are no edits
-   */
-  @Test
-  public void testNoEdits() throws Exception {
-    TableName tableName = TableName.valueOf("TestLogRollPeriodNoEdits");
-    TEST_UTIL.createTable(tableName, "cf");
-    try {
-      Table table = TEST_UTIL.getConnection().getTable(tableName);
-      try {
-        HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
-        WAL log = server.getWAL(null);
-        checkMinLogRolls(log, 5);
-      } finally {
-        table.close();
-      }
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
-    }
-  }
-
-  /**
-   * Tests that the LogRoller perform the roll with some data in the log
-   */
-  @Test(timeout=60000)
-  public void testWithEdits() throws Exception {
-    final TableName tableName = TableName.valueOf("TestLogRollPeriodWithEdits");
-    final String family = "cf";
-
-    TEST_UTIL.createTable(tableName, family);
-    try {
-      HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(tableName);
-      WAL log = server.getWAL(null);
-      final Table table = TEST_UTIL.getConnection().getTable(tableName);
-
-      Thread writerThread = new Thread("writer") {
-        @Override
-        public void run() {
-          try {
-            long row = 0;
-            while (!interrupted()) {
-              Put p = new Put(Bytes.toBytes(String.format("row%d", row)));
-              p.addColumn(Bytes.toBytes(family), Bytes.toBytes("col"), Bytes.toBytes(row));
-              table.put(p);
-              row++;
-
-              Thread.sleep(LOG_ROLL_PERIOD / 16);
-            }
-          } catch (Exception e) {
-            LOG.warn(e);
-          } 
-        }
-      };
-
-      try {
-        writerThread.start();
-        checkMinLogRolls(log, 5);
-      } finally {
-        writerThread.interrupt();
-        writerThread.join();
-        table.close();
-      }  
-    } finally {
-      TEST_UTIL.deleteTable(tableName);
-    }
-  }
-
-  private void checkMinLogRolls(final WAL log, final int minRolls)
-      throws Exception {
-    final List<Path> paths = new ArrayList<Path>();
-    log.registerWALActionsListener(new WALActionsListener.Base() {
-      @Override
-      public void postLogRoll(Path oldFile, Path newFile) {
-        LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
-        paths.add(newFile);
-      }
-    });
-
-    // Sleep until we should get at least min-LogRoll events
-    long wtime = System.currentTimeMillis();
-    Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD);
-    // Do some extra sleep in case the machine is slow,
-    // and the log-roll is not triggered exactly on LOG_ROLL_PERIOD.
-    final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size());
-    for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) {
-      Thread.sleep(LOG_ROLL_PERIOD / 4);
-    }
-    wtime = System.currentTimeMillis() - wtime;
-    LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls",
-                           paths.size(), wtime, wtime / paths.size(), minRolls));
-    assertFalse(paths.size() < minRolls);
+    Configuration conf = AbstractTestLogRollPeriod.TEST_UTIL.getConfiguration();
+    conf.set(WALFactory.WAL_PROVIDER, "filesystem");
+    AbstractTestLogRollPeriod.setUpBeforeClass();
   }
 }


Mime
View raw message