hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/8] hbase git commit: HBASE-10378 Refactor write-ahead-log implementation
Date Tue, 18 Nov 2014 20:04:58 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
new file mode 100644
index 0000000..d9fe0d0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * JUnit tests for the WALRecordReader
+ */
+@Category({MapReduceTests.class, MediumTests.class})
+public class TestWALRecordReader {
+  private final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path hbaseDir;
+  // visible for TestHLogRecordReader
+  static final TableName tableName = TableName.valueOf(getName());
+  private static final byte [] rowName = tableName.getName();
+  // visible for TestHLogRecordReader
+  static final HRegionInfo info = new HRegionInfo(tableName,
+      Bytes.toBytes(""), Bytes.toBytes(""), false);
+  private static final byte [] family = Bytes.toBytes("column");
+  private static final byte [] value = Bytes.toBytes("value");
+  private static HTableDescriptor htd;
+  private static Path logDir;
+
+  private static String getName() {
+    return "TestWALRecordReader";
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(hbaseDir);
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+
+  }
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.replication", 1);
+    TEST_UTIL.startMiniDFSCluster(1);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+    hbaseDir = TEST_UTIL.createRootDir();
+    
+    logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+
+    htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(family));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Test partial reads from the log based on passed time range
+   * @throws Exception
+   */
+  @Test
+  public void testPartialRead() throws Exception {
+    final WALFactory walfactory = new WALFactory(conf, null, getName());
+    WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
+    // This test depends on timestamp being millisecond based and the filename of the WAL also
+    // being millisecond based.
+    long ts = System.currentTimeMillis();
+    WALEdit edit = new WALEdit();
+    final AtomicLong sequenceId = new AtomicLong(0);
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
+    log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
+    log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
+        true, null);
+    log.sync();
+    LOG.info("Before 1st WAL roll " + log.toString());
+    log.rollWriter();
+    LOG.info("Past 1st WAL roll " + log.toString());
+
+    Thread.sleep(1);
+    long ts1 = System.currentTimeMillis();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
+    log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
+        true, null);
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
+    log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
+        true, null);
+    log.sync();
+    log.shutdown();
+    walfactory.shutdown();
+    LOG.info("Closed WAL " + log.toString());
+
+ 
+    WALInputFormat input = new WALInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
+
+    // only 1st file is considered, and only its 1st entry is used
+    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    // both files need to be considered
+    assertEquals(2, splits.size());
+    // only the 2nd entry from the 1st file is used
+    testSplit(splits.get(0), Bytes.toBytes("2"));
+    // only the 1nd entry from the 2nd file is used
+    testSplit(splits.get(1), Bytes.toBytes("3"));
+  }
+
+  /**
+   * Test basic functionality
+   * @throws Exception
+   */
+  @Test
+  public void testWALRecordReader() throws Exception {
+    final WALFactory walfactory = new WALFactory(conf, null, getName());
+    WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
+    byte [] value = Bytes.toBytes("value");
+    final AtomicLong sequenceId = new AtomicLong(0);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+        System.currentTimeMillis(), value));
+    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
+        null);
+    log.sync(txid);
+
+    Thread.sleep(1); // make sure 2nd log gets a later timestamp
+    long secondTs = System.currentTimeMillis();
+    log.rollWriter();
+
+    edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+        System.currentTimeMillis(), value));
+    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
+        null);
+    log.sync(txid);
+    log.shutdown();
+    walfactory.shutdown();
+    long thirdTs = System.currentTimeMillis();
+
+    // should have 2 log files now
+    WALInputFormat input = new WALInputFormat();
+    Configuration jobConf = new Configuration(conf);
+    jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+
+    // make sure both logs are found
+    List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    assertEquals(2, splits.size());
+
+    // should return exactly one KV
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+    // same for the 2nd split
+    testSplit(splits.get(1), Bytes.toBytes("2"));
+
+    // now test basic time ranges:
+
+    // set an endtime, the 2nd log file can be ignored completely.
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    assertEquals(1, splits.size());
+    testSplit(splits.get(0), Bytes.toBytes("1"));
+
+    // now set a start time
+    jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
+    jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
+    splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+    // both logs need to be considered
+    assertEquals(2, splits.size());
+    // but both readers skip all edits
+    testSplit(splits.get(0));
+    testSplit(splits.get(1));
+  }
+
+  protected WALKey getWalKey(final long sequenceid) {
+    return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+  }
+
+  protected WALRecordReader getReader() {
+    return new WALKeyRecordReader();
+  }
+
+  /**
+   * Create a new reader from the split, and match the edits against the passed columns.
+   */
+  private void testSplit(InputSplit split, byte[]... columns) throws Exception {
+    final WALRecordReader reader = getReader();
+    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+
+    for (byte[] column : columns) {
+      assertTrue(reader.nextKeyValue());
+      Cell cell = reader.getCurrentValue().getCells().get(0);
+      if (!Bytes.equals(column, cell.getQualifier())) {
+        assertTrue("expected [" + Bytes.toString(column) + "], actual ["
+            + Bytes.toString(cell.getQualifier()) + "]", false);
+      }
+    }
+    assertFalse(reader.nextKeyValue());
+    reader.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
new file mode 100644
index 0000000..d7a4618
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+public class InstrumentedLogWriter extends ProtobufLogWriter {
+
+  public InstrumentedLogWriter() {
+    super();
+  }
+
+  public static boolean activateFailure = false;
+  @Override
+    public void append(Entry entry) throws IOException {
+      super.append(entry);
+      if (activateFailure &&
+          Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
+        System.out.println(getClass().getName() + ": I will throw an exception now...");
+        throw(new IOException("This exception is instrumented and should only be thrown for testing"
+            ));
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
new file mode 100644
index 0000000..b766d14
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -0,0 +1,479 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Provides FSHLog test cases.
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestFSHLog {
+  protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
+      .getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  protected static Configuration conf;
+  protected static FileSystem fs;
+  protected static Path dir;
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+    final Path hbaseDir = TEST_UTIL.createRootDir();
+    dir = new Path(hbaseDir, currentTest.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration()
+        .setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+        "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+      "hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * A loaded WAL coprocessor won't break existing WAL test cases.
+   */
+  @Test
+  public void testWALCoprocessorLoaded() throws Exception {
+    // test to see whether the coprocessor is loaded or not.
+    FSHLog log = null;
+    try {
+      log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+      WALCoprocessorHost host = log.getCoprocessorHost();
+      Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+      assertNotNull(c);
+    } finally {
+      if (log != null) {
+        log.close();
+      }
+    }
+  }
+
+  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+                        int times, AtomicLong sequenceId) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor();
+    htd.addFamily(new HColumnDescriptor("row"));
+
+    final byte [] row = Bytes.toBytes("row");
+    for (int i = 0; i < times; i++) {
+      long timestamp = System.currentTimeMillis();
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, row, row, timestamp, row));
+      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
+          sequenceId, true, null);
+    }
+    log.sync();
+  }
+
+  /**
+   * helper method to simulate region flush for a WAL.
+   * @param wal
+   * @param regionEncodedName
+   */
+  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
+    wal.startCacheFlush(regionEncodedName);
+    wal.completeCacheFlush(regionEncodedName);
+  }
+
+  /**
+   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
+   * exception if we do). Comparison is based on the timestamp present in the wal name.
+   * @throws Exception
+   */
+  @Test 
+  public void testWALComparator() throws Exception {
+    FSHLog wal1 = null;
+    FSHLog walMeta = null;
+    try {
+      wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+      LOG.debug("Log obtained is: " + wal1);
+      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
+      Path p1 = wal1.computeFilename(11);
+      Path p2 = wal1.computeFilename(12);
+      // comparing with itself returns 0
+      assertTrue(comp.compare(p1, p1) == 0);
+      // comparing with different filenum.
+      assertTrue(comp.compare(p1, p2) < 0);
+      walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
+          DefaultWALProvider.META_WAL_PROVIDER_ID);
+      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
+
+      Path p1WithMeta = walMeta.computeFilename(11);
+      Path p2WithMeta = walMeta.computeFilename(12);
+      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
+      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
+      // mixing meta and non-meta logs gives error
+      boolean ex = false;
+      try {
+        comp.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        ex = true;
+      }
+      assertTrue("Comparator doesn't complain while checking meta log files", ex);
+      boolean exMeta = false;
+      try {
+        compMeta.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        exMeta = true;
+      }
+      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
+    } finally {
+      if (wal1 != null) {
+        wal1.close();
+      }
+      if (walMeta != null) {
+        walMeta.close();
+      }
+    }
+  }
+
+  /**
+   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
+   * list of regions which should be flushed in order to archive the oldest wal file.
+   * <p>
+   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
+   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
+   * rolling the wal.
+   * @throws Exception
+   */
+  @Test 
+  public void testFindMemStoresEligibleForFlush() throws Exception {
+    LOG.debug("testFindMemStoresEligibleForFlush");
+    Configuration conf1 = HBaseConfiguration.create(conf);
+    conf1.setInt("hbase.regionserver.maxlogs", 1);
+    FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
+        HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+    TableName t1 = TableName.valueOf("t1");
+    TableName t2 = TableName.valueOf("t2");
+    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    // variables to mock region sequenceIds
+    final AtomicLong sequenceId1 = new AtomicLong(1);
+    final AtomicLong sequenceId2 = new AtomicLong(1);
+    // add edits and roll the wal
+    try {
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // add some more edits and roll the wal. This would reach the log number threshold
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // with above rollWriter call, the max logs limit is reached.
+      assertTrue(wal.getNumRolledLogFiles() == 2);
+
+      // get the regions to flush; since there is only one region in the oldest wal, it should
+      // return only one region.
+      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(1, regionsToFlush.length);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // insert edits in second region
+      addEdits(wal, hri2, t2, 2, sequenceId2);
+      // get the regions to flush, it should still read region1.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(regionsToFlush.length, 1);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
+      // remain.
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      wal.rollWriter();
+      // only one wal should remain now (that is for the second region).
+      assertEquals(1, wal.getNumRolledLogFiles());
+      // flush the second region
+      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      wal.rollWriter(true);
+      // no wal should remain now.
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // add edits both to region 1 and region 2, and roll.
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      addEdits(wal, hri2, t2, 2, sequenceId2);
+      wal.rollWriter();
+      // add edits and roll the writer, to reach the max logs limit.
+      assertEquals(1, wal.getNumRolledLogFiles());
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // it should return two regions to flush, as the oldest wal file has entries
+      // for both regions.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(2, regionsToFlush.length);
+      // flush both regions
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      wal.rollWriter(true);
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // Add an edit to region1, and roll the wal.
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
+      wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+      wal.rollWriter();
+      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+      assertEquals(1, wal.getNumRolledLogFiles());
+    } finally {
+      if (wal != null) {
+        wal.close();
+      }
+    }
+  }
+
+  /**
+   * Simulates WAL append ops for a region and tests
+   * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
+   * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
+   * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
+   * region should be flushed before archiving this WAL.
+  */
+  @Test
+  public void testAllRegionsFlushed() {
+    LOG.debug("testAllRegionsFlushed");
+    Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
+    Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
+    Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
+    // create a table
+    TableName t1 = TableName.valueOf("t1");
+    // create a region
+    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    // variables to mock region sequenceIds
+    final AtomicLong sequenceId1 = new AtomicLong(1);
+    // test empty map
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // add entries in the region
+    seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
+    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+    // should say region1 is not flushed.
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // test with entries in oldestFlushing map.
+    oldestUnFlushedSeqNo.clear();
+    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
+    oldestFlushingSeqNo.clear();
+    oldestUnFlushedSeqNo.clear();
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // insert some large values for region1
+    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
+    seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+
+    // tests when oldestUnFlushed/oldestFlushing contains larger value.
+    // It means region is flushed.
+    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
+    oldestUnFlushedSeqNo.clear();
+    seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+  }
+
+  @Test(expected=IOException.class)
+  public void testFailedToCreateWALIfParentRenamed() throws IOException {
+    final String name = "testFailedToCreateWALIfParentRenamed";
+    FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
+        conf, null, true, null, null);
+    long filenum = System.currentTimeMillis();
+    Path path = log.computeFilename(filenum);
+    log.createWriterInstance(path);
+    Path parent = path.getParent();
+    path = log.computeFilename(filenum + 1);
+    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
+    fs.rename(parent, newPath);
+    log.createWriterInstance(path);
+    fail("It should fail to create the new WAL");
+  }
+
+  /**
+   * Test flush for sure has a sequence id that is beyond the last edit appended.  We do this
+   * by slowing appends in the background ring buffer thread while in foreground we call
+   * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
+   * returning before all of its appends had made it out to the WAL (HBASE-11109).
+   * @throws IOException
+   * @see HBASE-11109
+   */
+  @Test
+  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
+    String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
+    final TableName tableName = TableName.valueOf(testName);
+    final HRegionInfo hri = new HRegionInfo(tableName);
+    final byte[] rowName = tableName.getName();
+    final HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("f"));
+    HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
+      TEST_UTIL.getConfiguration(), htd);
+    HRegion.closeHRegion(r);
+    final int countPerFamily = 10;
+    final MutableBoolean goslow = new MutableBoolean(false);
+    // subclass and doctor a method.
+    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
+        testName, conf) {
+      @Override
+      void atHeadOfRingBufferEventHandlerAppend() {
+        if (goslow.isTrue()) {
+          Threads.sleep(100);
+          LOG.debug("Sleeping before appending 100ms");
+        }
+        super.atHeadOfRingBufferEventHandlerAppend();
+      }
+    };
+    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
+      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+    try {
+      List<Put> puts = null;
+      for (HColumnDescriptor hcd: htd.getFamilies()) {
+        puts =
+          TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
+      }
+
+      // Now assert edits made it in.
+      final Get g = new Get(rowName);
+      Result result = region.get(g);
+      assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+
+      // Construct a WALEdit and add it a few times to the WAL.
+      WALEdit edits = new WALEdit();
+      for (Put p: puts) {
+        CellScanner cs = p.cellScanner();
+        while (cs.advance()) {
+          edits.add(cs.current());
+        }
+      }
+      // Add any old cluster id.
+      List<UUID> clusterIds = new ArrayList<UUID>();
+      clusterIds.add(UUID.randomUUID());
+      // Now make appends run slow.
+      goslow.setValue(true);
+      for (int i = 0; i < countPerFamily; i++) {
+        final HRegionInfo info = region.getRegionInfo();
+        final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
+            System.currentTimeMillis(), clusterIds, -1, -1);
+        wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
+      }
+      region.flushcache();
+      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
+      long currentSequenceId = region.getSequenceId().get();
+      // Now release the appends
+      goslow.setValue(false);
+      synchronized (goslow) {
+        goslow.notifyAll();
+      }
+      assertTrue(currentSequenceId >= region.getSequenceId().get());
+    } finally {
+      region.close(true);
+      wal.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
new file mode 100644
index 0000000..04cb2ce
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
@@ -0,0 +1,209 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * WAL tests that can be reused across providers.
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestProtobufLog {
+  protected static final Log LOG = LogFactory.getLog(TestProtobufLog.class);
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  protected FileSystem fs;
+  protected Path dir;
+  protected WALFactory wals;
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
+    wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    wals.close();
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // needed for testAppendClose()
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration()
+        .setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+        "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+      "hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Reads the WAL with and without WALTrailer.
+   * @throws IOException
+   */
+  @Test
+  public void testWALTrailer() throws IOException {
+    // read With trailer.
+    doRead(true);
+    // read without trailer
+    doRead(false);
+  }
+
+  /**
+   * Appends entries in the WAL and reads it.
+   * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
+   *          so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
+   *          call. This means that reader is not aware of the trailer. In this scenario, if the
+   *          reader tries to read the trailer in its next() call, it returns false from
+   *          ProtoBufLogReader.
+   * @throws IOException
+   */
+  private void doRead(boolean withTrailer) throws IOException {
+    final int columnCount = 5;
+    final int recordCount = 5;
+    final TableName tableName =
+        TableName.valueOf("tablename");
+    final byte[] row = Bytes.toBytes("row");
+    long timestamp = System.currentTimeMillis();
+    Path path = new Path(dir, "tempwal");
+    // delete the log if already exists, for test only
+    fs.delete(path, true);
+    WALProvider.Writer writer = null;
+    ProtobufLogReader reader = null;
+    try {
+      HRegionInfo hri = new HRegionInfo(tableName,
+          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      fs.mkdirs(dir);
+      // Write log in pb format.
+      writer = wals.createWALWriter(fs, path);
+      for (int i = 0; i < recordCount; ++i) {
+        WALKey key = new WALKey(
+            hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
+        WALEdit edit = new WALEdit();
+        for (int j = 0; j < columnCount; ++j) {
+          if (i == 0) {
+            htd.addFamily(new HColumnDescriptor("column" + j));
+          }
+          String value = i + "" + j;
+          edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
+        }
+        writer.append(new WAL.Entry(key, edit));
+      }
+      writer.sync();
+      if (withTrailer) writer.close();
+
+      // Now read the log using standard means.
+      reader = (ProtobufLogReader) wals.createReader(fs, path);
+      if (withTrailer) {
+        assertNotNull(reader.trailer);
+      } else {
+        assertNull(reader.trailer);
+      }
+      for (int i = 0; i < recordCount; ++i) {
+        WAL.Entry entry = reader.next();
+        assertNotNull(entry);
+        assertEquals(columnCount, entry.getEdit().size());
+        assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+        assertEquals(tableName, entry.getKey().getTablename());
+        int idx = 0;
+        for (Cell val : entry.getEdit().getCells()) {
+          assertTrue(Bytes.equals(row, val.getRow()));
+          String value = i + "" + idx;
+          assertArrayEquals(Bytes.toBytes(value), val.getValue());
+          idx++;
+        }
+      }
+      WAL.Entry entry = reader.next();
+      assertNull(entry);
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
new file mode 100644
index 0000000..577f0ba
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Category({ReplicationTests.class, LargeTests.class})
+@RunWith(Parameterized.class)
+public class TestReplicationWALReaderManager {
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Configuration conf;
+  private static Path hbaseDir;
+  private static FileSystem fs;
+  private static MiniDFSCluster cluster;
+  private static final TableName tableName = TableName.valueOf("tablename");
+  private static final byte [] family = Bytes.toBytes("column");
+  private static final byte [] qualifier = Bytes.toBytes("qualifier");
+  private static final HRegionInfo info = new HRegionInfo(tableName,
+      HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
+  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+
+  private WAL log;
+  private ReplicationWALReaderManager logManager;
+  private PathWatcher pathWatcher;
+  private int nbRows;
+  private int walEditKVs;
+  private final AtomicLong sequenceId = new AtomicLong(1);
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    // Try out different combinations of row count and KeyValue count
+    int[] NB_ROWS = { 1500, 60000 };
+    int[] NB_KVS = { 1, 100 };
+    // whether compression is used
+    Boolean[] BOOL_VALS = { false, true };
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (int nbRows : NB_ROWS) {
+      for (int walEditKVs : NB_KVS) {
+        for (boolean b : BOOL_VALS) {
+          Object[] arr = new Object[3];
+          arr[0] = nbRows;
+          arr[1] = walEditKVs;
+          arr[2] = b;
+          parameters.add(arr);
+        }
+      }
+    }
+    return parameters;
+  }
+
+  public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
+    this.nbRows = nbRows;
+    this.walEditKVs = walEditKVs;
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+      enableCompression);
+  }
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    hbaseDir = TEST_UTIL.createRootDir();
+    cluster = TEST_UTIL.getDFSCluster();
+    fs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    logManager = new ReplicationWALReaderManager(fs, conf);
+    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+    pathWatcher = new PathWatcher();
+    listeners.add(pathWatcher);
+    final WALFactory wals = new WALFactory(conf, listeners, "some server");
+    log = wals.getWAL(info.getEncodedNameAsBytes());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    log.close();
+  }
+
+  @Test
+  public void test() throws Exception {
+    // Grab the path that was generated when the log rolled as part of its creation
+    Path path = pathWatcher.currentPath;
+
+    assertEquals(0, logManager.getPosition());
+
+    appendToLog();
+
+    // There's one edit in the log, read it. Reading past it needs to return nulls
+    assertNotNull(logManager.openReader(path));
+    logManager.seek();
+    WAL.Entry entry = logManager.readNextAndSetPosition();
+    assertNotNull(entry);
+    entry = logManager.readNextAndSetPosition();
+    assertNull(entry);
+    logManager.closeReader();
+    long oldPos = logManager.getPosition();
+
+    appendToLog();
+
+    // Read the newly added entry, make sure we made progress
+    assertNotNull(logManager.openReader(path));
+    logManager.seek();
+    entry = logManager.readNextAndSetPosition();
+    assertNotEquals(oldPos, logManager.getPosition());
+    assertNotNull(entry);
+    logManager.closeReader();
+    oldPos = logManager.getPosition();
+
+    log.rollWriter();
+
+    // We rolled but we still should see the end of the first log and not get data
+    assertNotNull(logManager.openReader(path));
+    logManager.seek();
+    entry = logManager.readNextAndSetPosition();
+    assertEquals(oldPos, logManager.getPosition());
+    assertNull(entry);
+    logManager.finishCurrentFile();
+
+    path = pathWatcher.currentPath;
+
+    for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
+    log.rollWriter();
+    logManager.openReader(path);
+    logManager.seek();
+    for (int i = 0; i < nbRows; i++) {
+      WAL.Entry e = logManager.readNextAndSetPosition();
+      if (e == null) {
+        fail("Should have enough entries");
+      }
+    }
+  }
+
+  private void appendToLog() throws IOException {
+    appendToLogPlus(1);
+  }
+
+  private void appendToLogPlus(int count) throws IOException {
+    final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+        System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null);
+    log.sync(txid);
+  }
+
+  private WALEdit getWALEdits(int count) {
+    WALEdit edit = new WALEdit();
+    for (int i = 0; i < count; i++) {
+      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+        System.currentTimeMillis(), qualifier));
+    }
+    return edit;
+  }
+
+  class PathWatcher extends WALActionsListener.Base {
+
+    Path currentPath;
+
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+      currentPath = newPath;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
new file mode 100644
index 0000000..3212822
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+
+// imports for things that haven't moved yet
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * This is a utility class, used by tests, which fails operation specified by FailureType enum
+ */
+@InterfaceAudience.Private
+public class FaultyFSLog extends FSHLog {
+  public enum FailureType {
+    NONE, APPEND, SYNC
+  }
+  FailureType ft = FailureType.NONE;
+
+  public FaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
+      throws IOException {
+    super(fs, rootDir, logName, conf);
+  }
+  
+  public void setFailureType(FailureType fType) {
+    this.ft = fType;
+  }
+  
+  @Override
+  public void sync(long txid) throws IOException {
+    if (this.ft == FailureType.SYNC) {
+      throw new IOException("sync");
+    }
+    super.sync(txid);
+  }
+
+  @Override
+  public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
+      AtomicLong sequenceId, boolean isInMemstore, List<Cell> cells) throws IOException {
+    if (this.ft == FailureType.APPEND) {
+      throw new IOException("append");
+    }
+    return super.append(htd, info, key, edits, sequenceId, isInMemstore, cells);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
new file mode 100644
index 0000000..9798b6c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -0,0 +1,333 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestDefaultWALProvider {
+  protected static final Log LOG = LogFactory.getLog(TestDefaultWALProvider.class);
+
+  protected static Configuration conf;
+  protected static FileSystem fs;
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration()
+        .setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+        "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+      "hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    // Set up a working space for our tests.
+    TEST_UTIL.createRootDir();
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  static String getName() {
+    return "TestDefaultWALProvider";
+  }
+
+  @Test
+  public void testGetServerNameFromWALDirectoryName() throws IOException {
+    ServerName sn = ServerName.valueOf("hn", 450, 1398);
+    String hl = FSUtils.getRootDir(conf) + "/" +
+        DefaultWALProvider.getWALDirectoryName(sn.toString());
+
+    // Must not throw exception
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+        FSUtils.getRootDir(conf).toUri().toString()));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, ""));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "                  "));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf"));
+    assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf"));
+
+    final String wals = "/WALs/";
+    ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+      FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+      "/localhost%2C32984%2C1343316388997.1343316390417");
+    assertEquals("standard",  sn, parsed);
+
+    parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf");
+    assertEquals("subdir", sn, parsed);
+
+    parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+      FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+      "-splitting/localhost%3A57020.1340474893931");
+    assertEquals("split", sn, parsed);
+  }
+
+
+  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+                        int times, AtomicLong sequenceId) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor();
+    htd.addFamily(new HColumnDescriptor("row"));
+
+    final byte [] row = Bytes.toBytes("row");
+    for (int i = 0; i < times; i++) {
+      long timestamp = System.currentTimeMillis();
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, row, row, timestamp, row));
+      log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
+          sequenceId, true, null);
+    }
+    log.sync();
+  }
+
+  /**
+   * used by TestDefaultWALProviderWithHLogKey
+   */
+  WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
+    return new WALKey(info, tableName, timestamp);
+  }
+
+  /**
+   * helper method to simulate region flush for a WAL.
+   * @param wal
+   * @param regionEncodedName
+   */
+  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
+    wal.startCacheFlush(regionEncodedName);
+    wal.completeCacheFlush(regionEncodedName);
+  }
+
+  private static final byte[] UNSPECIFIED_REGION = new byte[]{};
+
+  @Test
+  public void testLogCleaning() throws Exception {
+    LOG.info("testLogCleaning");
+    final TableName tableName =
+        TableName.valueOf("testLogCleaning");
+    final TableName tableName2 =
+        TableName.valueOf("testLogCleaning2");
+    final Configuration localConf = new Configuration(conf);
+    localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
+    final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
+    final AtomicLong sequenceId = new AtomicLong(1);
+    try {
+      HRegionInfo hri = new HRegionInfo(tableName,
+          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      HRegionInfo hri2 = new HRegionInfo(tableName2,
+          HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      // we want to mix edits from regions, so pick our own identifier.
+      final WAL log = wals.getWAL(UNSPECIFIED_REGION);
+
+      // Add a single edit and make sure that rolling won't remove the file
+      // Before HBASE-3198 it used to delete it
+      addEdits(log, hri, tableName, 1, sequenceId);
+      log.rollWriter();
+      assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
+
+      // See if there's anything wrong with more than 1 edit
+      addEdits(log, hri, tableName, 2, sequenceId);
+      log.rollWriter();
+      assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
+
+      // Now mix edits from 2 regions, still no flushing
+      addEdits(log, hri, tableName, 1, sequenceId);
+      addEdits(log, hri2, tableName2, 1, sequenceId);
+      addEdits(log, hri, tableName, 1, sequenceId);
+      addEdits(log, hri2, tableName2, 1, sequenceId);
+      log.rollWriter();
+      assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
+
+      // Flush the first region, we expect to see the first two files getting
+      // archived. We need to append something or writer won't be rolled.
+      addEdits(log, hri2, tableName2, 1, sequenceId);
+      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      log.rollWriter();
+      assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
+
+      // Flush the second region, which removes all the remaining output files
+      // since the oldest was completely flushed and the two others only contain
+      // flush information
+      addEdits(log, hri2, tableName2, 1, sequenceId);
+      log.startCacheFlush(hri2.getEncodedNameAsBytes());
+      log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+      log.rollWriter();
+      assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
+    } finally {
+      if (wals != null) {
+        wals.close();
+      }
+    }
+  }
+
+  /**
+   * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
+   * and also don't archive "live logs" (that is, a log with un-flushed entries).
+   * <p>
+   * This is what it does:
+   * It creates two regions, and does a series of inserts along with log rolling.
+   * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for
+   * archiving if for all the regions which have entries in that wal file, have flushed - past
+   * their maximum sequence id in that wal file.
+   * <p>
+   * @throws IOException
+   */
+  @Test 
+  public void testWALArchiving() throws IOException {
+    LOG.debug("testWALArchiving");
+    TableName table1 = TableName.valueOf("t1");
+    TableName table2 = TableName.valueOf("t2");
+    final Configuration localConf = new Configuration(conf);
+    localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
+    final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
+    try {
+      final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
+      assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+      HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
+          HConstants.EMPTY_END_ROW);
+      HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
+          HConstants.EMPTY_END_ROW);
+      // ensure that we don't split the regions.
+      hri1.setSplit(false);
+      hri2.setSplit(false);
+      // variables to mock region sequenceIds.
+      final AtomicLong sequenceId1 = new AtomicLong(1);
+      final AtomicLong sequenceId2 = new AtomicLong(1);
+      // start with the testing logic: insert a waledit, and roll writer
+      addEdits(wal, hri1, table1, 1, sequenceId1);
+      wal.rollWriter();
+      // assert that the wal is rolled
+      assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // add edits in the second wal file, and roll writer.
+      addEdits(wal, hri1, table1, 1, sequenceId1);
+      wal.rollWriter();
+      // assert that the wal is rolled
+      assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // add a waledit to table1, and flush the region.
+      addEdits(wal, hri1, table1, 3, sequenceId1);
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      // roll log; all old logs should be archived.
+      wal.rollWriter();
+      assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // add an edit to table2, and roll writer
+      addEdits(wal, hri2, table2, 1, sequenceId2);
+      wal.rollWriter();
+      assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // add edits for table1, and roll writer
+      addEdits(wal, hri1, table1, 2, sequenceId1);
+      wal.rollWriter();
+      assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // add edits for table2, and flush hri1.
+      addEdits(wal, hri2, table2, 2, sequenceId2);
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      // the log : region-sequenceId map is
+      // log1: region2 (unflushed)
+      // log2: region1 (flushed)
+      // log3: region2 (unflushed)
+      // roll the writer; log2 should be archived.
+      wal.rollWriter();
+      assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+      // flush region2, and all logs should be archived.
+      addEdits(wal, hri2, table2, 2, sequenceId2);
+      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      wal.rollWriter();
+      assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+    } finally {
+      if (wals != null) {
+        wals.close();
+      }
+    }
+  }
+
+  /**
+   * Write to a log file with three concurrent threads and verifying all data is written.
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    // Run the WPE tool with three threads writing 3000 edits each concurrently.
+    // When done, verify that all edits were written.
+    int errCode = WALPerformanceEvaluation.
+      innerMain(new Configuration(TEST_UTIL.getConfiguration()),
+        new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
+    assertEquals(0, errCode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
new file mode 100644
index 0000000..c667e94
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider {
+  @Override
+  WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
+    return new HLogKey(info, tableName, timestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5e05eb8/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
new file mode 100644
index 0000000..6f05839
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.log4j.Level;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestSecureWAL {
+  static final Log LOG = LogFactory.getLog(TestSecureWAL.class);
+  static {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
+      .getLogger().setLevel(Level.ALL);
+  };
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
+    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
+      WAL.Reader.class);
+    conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
+      WALProvider.Writer.class);
+    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
+    FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
+  }
+
+  @Test
+  public void testSecureWAL() throws Exception {
+    TableName tableName = TableName.valueOf("TestSecureWAL");
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(tableName.getName()));
+    HRegionInfo regioninfo = new HRegionInfo(tableName,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+    final int total = 10;
+    final byte[] row = Bytes.toBytes("row");
+    final byte[] family = Bytes.toBytes("family");
+    final byte[] value = Bytes.toBytes("Test value");
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    final WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestSecureWAL");
+    final AtomicLong sequenceId = new AtomicLong(1);
+
+    // Write the WAL
+    final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
+
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+      wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+          System.currentTimeMillis()), kvs, sequenceId, true, null);
+    }
+    wal.sync();
+    final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
+    wals.shutdown();
+
+    // Insure edits are not plaintext
+    long length = fs.getFileStatus(walPath).getLen();
+    FSDataInputStream in = fs.open(walPath);
+    byte[] fileData = new byte[(int)length];
+    IOUtils.readFully(in, fileData);
+    in.close();
+    assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
+
+    // Confirm the WAL can be read back
+    WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+    int count = 0;
+    WAL.Entry entry = new WAL.Entry();
+    while (reader.next(entry) != null) {
+      count++;
+      List<Cell> cells = entry.getEdit().getCells();
+      assertTrue("Should be one KV per WALEdit", cells.size() == 1);
+      for (Cell cell: cells) {
+        byte[] thisRow = cell.getRow();
+        assertTrue("Incorrect row", Bytes.equals(thisRow, row));
+        byte[] thisFamily = cell.getFamily();
+        assertTrue("Incorrect family", Bytes.equals(thisFamily, family));
+        byte[] thisValue = cell.getValue();
+        assertTrue("Incorrect value", Bytes.equals(thisValue, value));
+      }
+    }
+    assertEquals("Should have read back as many KVs as written", total, count);
+    reader.close();
+  }
+
+}


Mime
View raw message