hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [08/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.
Date Tue, 02 Dec 2014 17:20:47 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
deleted file mode 100644
index f56ef98..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
-import org.apache.hadoop.hbase.trace.SpanReceiverHost;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.htrace.Sampler;
-import org.htrace.Trace;
-import org.htrace.TraceScope;
-import org.htrace.impl.ProbabilitySampler;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.reporting.ConsoleReporter;
-
-/**
- * This class runs performance benchmarks for {@link HLog}.
- * See usage for this tool by running:
- * <code>$ hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation -h</code>
- */
-@InterfaceAudience.Private
-public final class HLogPerformanceEvaluation extends Configured implements Tool {
-  static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName());
-  private final MetricsRegistry metrics = new MetricsRegistry();
-  private final Meter syncMeter =
-    metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
-  private final Histogram syncHistogram =
-    metrics.newHistogram(HLogPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs",
-      true);
-  private final Histogram syncCountHistogram =
-      metrics.newHistogram(HLogPerformanceEvaluation.class, "syncCountHistogram", "countPerSync",
-        true);
-  private final Meter appendMeter =
-    metrics.newMeter(HLogPerformanceEvaluation.class, "appendMeter", "bytes",
-      TimeUnit.MILLISECONDS);
-  private final Histogram latencyHistogram =
-    metrics.newHistogram(HLogPerformanceEvaluation.class, "latencyHistogram", "nanos", true);
-
-  private HBaseTestingUtility TEST_UTIL;
-
-  static final String TABLE_NAME = "HLogPerformanceEvaluation";
-  static final String QUALIFIER_PREFIX = "q";
-  static final String FAMILY_PREFIX = "cf";
-
-  private int numQualifiers = 1;
-  private int valueSize = 512;
-  private int keySize = 16;
-
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    TEST_UTIL = new HBaseTestingUtility(conf);
-  }
-
-  /**
-   * Perform HLog.append() of Put object, for the number of iterations requested.
-   * Keys and Vaues are generated randomly, the number of column families,
-   * qualifiers and key/value size is tunable by the user.
-   */
-  class HLogPutBenchmark implements Runnable {
-    private final long numIterations;
-    private final int numFamilies;
-    private final boolean noSync;
-    private final HRegion region;
-    private final int syncInterval;
-    private final HTableDescriptor htd;
-    private final Sampler loopSampler;
-
-    HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
-        final long numIterations, final boolean noSync, final int syncInterval,
-        final double traceFreq) {
-      this.numIterations = numIterations;
-      this.noSync = noSync;
-      this.syncInterval = syncInterval;
-      this.numFamilies = htd.getColumnFamilies().length;
-      this.region = region;
-      this.htd = htd;
-      String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
-      if (spanReceivers == null || spanReceivers.isEmpty()) {
-        loopSampler = Sampler.NEVER;
-      } else {
-        if (traceFreq <= 0.0) {
-          LOG.warn("Tracing enabled but traceFreq=0.");
-          loopSampler = Sampler.NEVER;
-        } else if (traceFreq >= 1.0) {
-          loopSampler = Sampler.ALWAYS;
-          if (numIterations > 1000) {
-            LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
-              + " SpanReciever can keep up.");
-          }
-        } else {
-          loopSampler = new ProbabilitySampler(traceFreq);
-        }
-      }
-    }
-
-    @Override
-    public void run() {
-      byte[] key = new byte[keySize];
-      byte[] value = new byte[valueSize];
-      Random rand = new Random(Thread.currentThread().getId());
-      HLog hlog = region.getLog();
-      ArrayList<UUID> clusters = new ArrayList<UUID>();
-      long nonce = HConstants.NO_NONCE;
-
-      TraceScope threadScope =
-        Trace.startSpan("HLogPerfEval." + Thread.currentThread().getName());
-      try {
-        long startTime = System.currentTimeMillis();
-        int lastSync = 0;
-        for (int i = 0; i < numIterations; ++i) {
-          assert Trace.currentSpan() == threadScope.getSpan() : "Span leak detected.";
-          TraceScope loopScope = Trace.startSpan("runLoopIter" + i, loopSampler);
-          try {
-            long now = System.nanoTime();
-            Put put = setupPut(rand, key, value, numFamilies);
-            WALEdit walEdit = new WALEdit();
-            addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
-            HRegionInfo hri = region.getRegionInfo();
-            hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
-              region.getSequenceId(), true, nonce, nonce);
-            if (!this.noSync) {
-              if (++lastSync >= this.syncInterval) {
-                hlog.sync();
-                lastSync = 0;
-              }
-            }
-            latencyHistogram.update(System.nanoTime() - now);
-          } finally {
-            loopScope.close();
-          }
-        }
-        long totalTime = (System.currentTimeMillis() - startTime);
-        logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
-      } catch (Exception e) {
-        LOG.error(getClass().getSimpleName() + " Thread failed", e);
-      } finally {
-        threadScope.close();
-      }
-    }
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    Path rootRegionDir = null;
-    int numThreads = 1;
-    long numIterations = 1000000;
-    int numFamilies = 1;
-    int syncInterval = 0;
-    boolean noSync = false;
-    boolean verify = false;
-    boolean verbose = false;
-    boolean cleanup = true;
-    boolean noclosefs = false;
-    long roll = Long.MAX_VALUE;
-    boolean compress = false;
-    String cipher = null;
-    String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
-    boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
-    double traceFreq = 1.0;
-    // Process command line args
-    for (int i = 0; i < args.length; i++) {
-      String cmd = args[i];
-      try {
-        if (cmd.equals("-threads")) {
-          numThreads = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-iterations")) {
-          numIterations = Long.parseLong(args[++i]);
-        } else if (cmd.equals("-path")) {
-          rootRegionDir = new Path(args[++i]);
-        } else if (cmd.equals("-families")) {
-          numFamilies = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-qualifiers")) {
-          numQualifiers = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-keySize")) {
-          keySize = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-valueSize")) {
-          valueSize = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-syncInterval")) {
-          syncInterval = Integer.parseInt(args[++i]);
-        } else if (cmd.equals("-nosync")) {
-          noSync = true;
-        } else if (cmd.equals("-verify")) {
-          verify = true;
-        } else if (cmd.equals("-verbose")) {
-          verbose = true;
-        } else if (cmd.equals("-nocleanup")) {
-          cleanup = false;
-        } else if (cmd.equals("-noclosefs")) {
-          noclosefs = true;
-        } else if (cmd.equals("-roll")) {
-          roll = Long.parseLong(args[++i]);
-        } else if (cmd.equals("-compress")) {
-          compress = true;
-        } else if (cmd.equals("-encryption")) {
-          cipher = args[++i];
-        } else if (cmd.equals("-traceFreq")) {
-          traceFreq = Double.parseDouble(args[++i]);
-        } else if (cmd.equals("-h")) {
-          printUsageAndExit();
-        } else if (cmd.equals("--help")) {
-          printUsageAndExit();
-        } else {
-          System.err.println("UNEXPECTED: " + cmd);
-          printUsageAndExit();
-        }
-      } catch (Exception e) {
-        printUsageAndExit();
-      }
-    }
-
-    if (compress) {
-      Configuration conf = getConf();
-      conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-    }
-
-    if (cipher != null) {
-      // Set up HLog for encryption
-      Configuration conf = getConf();
-      conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
-      conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-        HLog.Reader.class);
-      conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
-        HLog.Writer.class);
-      conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
-      conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
-    }
-
-    // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
-    // In regionserver, number of handlers == number of threads.
-    getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
-
-    // Run HLog Performance Evaluation
-    // First set the fs from configs.  In case we are on hadoop1
-    FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
-    FileSystem fs = FileSystem.get(getConf());
-    LOG.info("FileSystem: " + fs);
-
-    SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
-    TraceScope scope = Trace.startSpan("HLogPerfEval", trace ? Sampler.ALWAYS : Sampler.NEVER);
-
-    try {
-      if (rootRegionDir == null) {
-        rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
-      }
-      rootRegionDir = rootRegionDir.makeQualified(fs);
-      cleanRegionRootDir(fs, rootRegionDir);
-      // Initialize Table Descriptor
-      HTableDescriptor htd = createHTableDescriptor(numFamilies);
-      final long whenToRoll = roll;
-      final HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
-
-        @Override
-        public void postSync(final long timeInNanos, final int handlerSyncs) {
-          super.postSync(timeInNanos, handlerSyncs);
-          syncMeter.mark();
-          syncHistogram.update(timeInNanos);
-          syncCountHistogram.update(handlerSyncs);
-        }
-
-        @Override
-        public long postAppend(final HLog.Entry entry, final long elapsedTime) {
-          long size = super.postAppend(entry, elapsedTime);
-          appendMeter.mark(size);
-          return size;
-        }
-      };
-      hlog.registerWALActionsListener(new WALActionsListener() {
-        private int appends = 0;
-
-        @Override
-        public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
-            WALEdit logEdit) {
-          this.appends++;
-          if (this.appends % whenToRoll == 0) {
-            LOG.info("Rolling after " + appends + " edits");
-            // We used to do explicit call to rollWriter but changed it to a request
-            // to avoid dead lock (there are less threads going on in this class than
-            // in the regionserver -- regionserver does not have the issue).
-            ((FSHLog)hlog).requestLogRoll();
-          }
-        }
-
-        @Override
-        public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
-        }
-
-        @Override
-        public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-        }
-
-        @Override
-        public void preLogArchive(Path oldPath, Path newPath) throws IOException {
-        }
-
-        @Override
-        public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-        }
-
-        @Override
-        public void postLogArchive(Path oldPath, Path newPath) throws IOException {
-        }
-
-        @Override
-        public void logRollRequested() {
-        }
-
-        @Override
-        public void logCloseRequested() {
-        }
-      });
-      hlog.rollWriter();
-      HRegion region = null;
-
-      try {
-        region = openRegion(fs, rootRegionDir, htd, hlog);
-        ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
-        long putTime =
-          runBenchmark(Trace.wrap(
-              new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval, traceFreq)),
-            numThreads);
-        logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
-          ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
-        
-        if (region != null) {
-          closeRegion(region);
-          region = null;
-        }
-        if (verify) {
-          Path dir = ((FSHLog) hlog).getDir();
-          long editCount = 0;
-          FileStatus [] fsss = fs.listStatus(dir);
-          if (fsss.length == 0) throw new IllegalStateException("No WAL found");
-          for (FileStatus fss: fsss) {
-            Path p = fss.getPath();
-            if (!fs.exists(p)) throw new IllegalStateException(p.toString());
-            editCount += verify(p, verbose);
-          }
-          long expected = numIterations * numThreads;
-          if (editCount != expected) {
-            throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
-          }
-        }
-      } finally {
-        if (region != null) closeRegion(region);
-        // Remove the root dir for this test region
-        if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
-      }
-    } finally {
-      // We may be called inside a test that wants to keep on using the fs.
-      if (!noclosefs) fs.close();
-      scope.close();
-      if (receiverHost != null) receiverHost.closeReceivers();
-    }
-
-    return(0);
-  }
-
-  private static HTableDescriptor createHTableDescriptor(final int numFamilies) {
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
-    for (int i = 0; i < numFamilies; ++i) {
-      HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
-      htd.addFamily(colDef);
-    }
-    return htd;
-  }
-
-  /**
-   * Verify the content of the WAL file.
-   * Verify that the file has expected number of edits.
-   * @param wal
-   * @return Count of edits.
-   * @throws IOException
-   */
-  private long verify(final Path wal, final boolean verbose) throws IOException {
-    HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
-    long count = 0;
-    Map<String, Long> sequenceIds = new HashMap<String, Long>();
-    try {
-      while (true) {
-        Entry e = reader.next();
-        if (e == null) {
-          LOG.debug("Read count=" + count + " from " + wal);
-          break;
-        }
-        count++;
-        long seqid = e.getKey().getLogSeqNum();
-        if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
-          // sequenceIds should be increasing for every regions
-          if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
-            throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
-                + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
-                + ", current seqid = " + seqid);
-          }
-        }
-        // update the sequence Id.
-        sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
-        if (verbose) LOG.info("seqid=" + seqid);
-      }
-    } finally {
-      reader.close();
-    }
-    return count;
-  }
-
-  private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
-    float tsec = totalTime / 1000.0f;
-    LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
-    
-  }
-
-  private void printUsageAndExit() {
-    System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
-    System.err.println(" where [options] are:");
-    System.err.println("  -h|-help         Show this help and exit.");
-    System.err.println("  -threads <N>     Number of threads writing on the WAL.");
-    System.err.println("  -iterations <N>  Number of iterations per thread.");
-    System.err.println("  -path <PATH>     Path where region's root directory is created.");
-    System.err.println("  -families <N>    Number of column families to write.");
-    System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
-    System.err.println("  -keySize <N>     Row key size in byte.");
-    System.err.println("  -valueSize <N>   Row/Col value size in byte.");
-    System.err.println("  -nocleanup       Do NOT remove test data when done.");
-    System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
-    System.err.println("  -nosync          Append without syncing");
-    System.err.println("  -syncInterval <N> Append N edits and then sync. " +
-      "Default=0, i.e. sync every edit.");
-    System.err.println("  -verify          Verify edits written in sequence");
-    System.err.println("  -verbose         Output extra info; " +
-      "e.g. all edit seq ids when verifying");
-    System.err.println("  -roll <N>        Roll the way every N appends");
-    System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
-    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, " +
-      "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
-    System.err.println("");
-    System.err.println("Examples:");
-    System.err.println("");
-    System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
-      "verification afterward do:");
-    System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal." +
-      "HLogPerformanceEvaluation \\");
-    System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
-      "-threads 100 -roll 10000 -verify");
-    System.exit(1);
-  }
-
-  private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd,
-      final HLog hlog)
-  throws IOException {
-    // Initialize HRegion
-    HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
-    return HRegion.createHRegion(regionInfo, dir, getConf(), htd, hlog);
-  }
-
-  private void closeRegion(final HRegion region) throws IOException {
-    if (region != null) {
-      region.close();
-      HLog wal = region.getLog();
-      if (wal != null) wal.close();
-    }
-  }
-
-  private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
-    if (fs.exists(dir)) {
-      fs.delete(dir, true);
-    }
-  }
-
-  private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
-    rand.nextBytes(key);
-    Put put = new Put(key);
-    for (int cf = 0; cf < numFamilies; ++cf) {
-      for (int q = 0; q < numQualifiers; ++q) {
-        rand.nextBytes(value);
-        put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
-      }
-    }
-    return put;
-  }
-
-  private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
-      WALEdit walEdit) {
-    for (List<Cell> edits : familyMap.values()) {
-      for (Cell cell : edits) {
-        walEdit.add(cell);
-      }
-    }
-  }
-
-  private long runBenchmark(Runnable runnable, final int numThreads) throws InterruptedException {
-    Thread[] threads = new Thread[numThreads];
-    long startTime = System.currentTimeMillis();
-    for (int i = 0; i < numThreads; ++i) {
-      threads[i] = new Thread(runnable, "t" + i);
-      threads[i].start();
-    }
-    for (Thread t : threads) t.join();
-    long endTime = System.currentTimeMillis();
-    return(endTime - startTime);
-  }
-
-  /**
-   * The guts of the {@link #main} method.
-   * Call this method to avoid the {@link #main(String[])} System.exit.
-   * @param args
-   * @return errCode
-   * @throws Exception
-   */
-  static int innerMain(final Configuration c, final String [] args) throws Exception {
-    return ToolRunner.run(c, new HLogPerformanceEvaluation(), args);
-  }
-
-  public static void main(String[] args) throws Exception {
-     System.exit(innerMain(HBaseConfiguration.create(), args));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
deleted file mode 100644
index 80eb8c2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * An Utility testcase that returns the number of log files that
- * were rolled to be accessed from outside packages.
- * 
- * This class makes available methods that are package protected.
- *  This is interesting for test only.
- */
-public class HLogUtilsForTests {
-  
-  /**
-   * 
-   * @param log
-   * @return
-   */
-  public static int getNumRolledLogFiles(HLog log) {
-    return ((FSHLog) log).getNumRolledLogFiles();
-  }
-
-  public static int getNumEntries(HLog log) {
-    return ((FSHLog) log).getNumEntries();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
new file mode 100644
index 0000000..d7a4618
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+public class InstrumentedLogWriter extends ProtobufLogWriter {
+
+  public InstrumentedLogWriter() {
+    super();
+  }
+
+  public static boolean activateFailure = false;
+  @Override
+    public void append(Entry entry) throws IOException {
+      super.append(entry);
+      if (activateFailure &&
+          Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
+        System.out.println(getClass().getName() + ": I will throw an exception now...");
+        throw(new IOException("This exception is instrumented and should only be thrown for testing"
+            ));
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
deleted file mode 100644
index d240e66..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
-public class InstrumentedSequenceFileLogWriter extends ProtobufLogWriter {
-
-  public InstrumentedSequenceFileLogWriter() {
-    super();
-  }
-
-  public static boolean activateFailure = false;
-  @Override
-    public void append(HLog.Entry entry) throws IOException {
-      super.append(entry);
-      if (activateFailure && Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
-        System.out.println(getClass().getName() + ": I will throw an exception now...");
-        throw(new IOException("This exception is instrumented and should only be thrown for testing"));
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 221f76e..7c13c00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -32,8 +32,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -42,8 +43,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 
 /**
- * Implementation of {@link HLog.Writer} that delegates to
+ * Implementation of {@link WALProvider.Writer} that delegates to
  * SequenceFile.Writer. Legacy implementation only used for compat tests.
+ *
+ * Note that because this class writes to the legacy hadoop-specific SequenceFile
+ * format, users of it must write {@link HLogKey} keys and not arbitrary
+ * {@link WALKey}s because the latter are not Writables (nor made to work with
+ * Hadoop serialization).
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class SequenceFileLogWriter extends WriterBase {
@@ -163,7 +169,7 @@ public class SequenceFileLogWriter extends WriterBase {
   }
 
   @Override
-  public void append(HLog.Entry entry) throws IOException {
+  public void append(WAL.Entry entry) throws IOException {
     entry.setCompressionContext(compressionContext);
     try {
       this.writer.append(entry.getKey(), entry.getEdit());
@@ -213,11 +219,4 @@ public class SequenceFileLogWriter extends WriterBase {
   public FSDataOutputStream getWriterFSDataOutputStream() {
     return this.writer_out;
   }
-
-  /**
-   * This method is empty as trailer is added only in Protobuf based hlog readers/writers.
-   */
-  @Override
-  public void setWALTrailer(WALTrailer walTrailer) {
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index a09bfa0..2f515d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -37,6 +37,10 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -44,7 +48,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * Tests for HLog write durability
+ * Tests for WAL write durability
  */
 @Category(MediumTests.class)
 public class TestDurability {
@@ -67,6 +71,7 @@ public class TestDurability {
     CLUSTER = TEST_UTIL.getDFSCluster();
     FS = CLUSTER.getFileSystem();
     DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
+    FSUtils.setRootDir(CONF, DIR);
   }
 
   @AfterClass
@@ -76,14 +81,14 @@ public class TestDurability {
 
   @Test
   public void testDurability() throws Exception {
-    HLog wal = HLogFactory.createHLog(FS, DIR, "hlogdir",
-        "hlogdir_archive", CONF);
+    final WALFactory wals = new WALFactory(CONF, null, "TestDurability");
     byte[] tableName = Bytes.toBytes("TestDurability");
+    final WAL wal = wals.getWAL(tableName);
     HRegion region = createHRegion(tableName, "region", wal, Durability.USE_DEFAULT);
     HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, Durability.ASYNC_WAL);
 
     region.put(newPut(null));
-    verifyHLogCount(wal, 1);
+    verifyWALCount(wals, wal, 1);
 
     // a put through the deferred table does not write to the wal immediately,
     // but maybe has been successfully sync-ed by the underlying AsyncWriter +
@@ -91,44 +96,44 @@ public class TestDurability {
     deferredRegion.put(newPut(null));
     // but will after we sync the wal
     wal.sync();
-    verifyHLogCount(wal, 2);
+    verifyWALCount(wals, wal, 2);
 
     // a put through a deferred table will be sync with the put sync'ed put
     deferredRegion.put(newPut(null));
     wal.sync();
-    verifyHLogCount(wal, 3);
+    verifyWALCount(wals, wal, 3);
     region.put(newPut(null));
-    verifyHLogCount(wal, 4);
+    verifyWALCount(wals, wal, 4);
 
     // a put through a deferred table will be sync with the put sync'ed put
     deferredRegion.put(newPut(Durability.USE_DEFAULT));
     wal.sync();
-    verifyHLogCount(wal, 5);
+    verifyWALCount(wals, wal, 5);
     region.put(newPut(Durability.USE_DEFAULT));
-    verifyHLogCount(wal, 6);
+    verifyWALCount(wals, wal, 6);
 
     // SKIP_WAL never writes to the wal
     region.put(newPut(Durability.SKIP_WAL));
     deferredRegion.put(newPut(Durability.SKIP_WAL));
-    verifyHLogCount(wal, 6);
+    verifyWALCount(wals, wal, 6);
     wal.sync();
-    verifyHLogCount(wal, 6);
+    verifyWALCount(wals, wal, 6);
 
     // Async overrides sync table default
     region.put(newPut(Durability.ASYNC_WAL));
     deferredRegion.put(newPut(Durability.ASYNC_WAL));
     wal.sync();
-    verifyHLogCount(wal, 8);
+    verifyWALCount(wals, wal, 8);
 
     // sync overrides async table default
     region.put(newPut(Durability.SYNC_WAL));
     deferredRegion.put(newPut(Durability.SYNC_WAL));
-    verifyHLogCount(wal, 10);
+    verifyWALCount(wals, wal, 10);
 
     // fsync behaves like sync
     region.put(newPut(Durability.FSYNC_WAL));
     deferredRegion.put(newPut(Durability.FSYNC_WAL));
-    verifyHLogCount(wal, 12);
+    verifyWALCount(wals, wal, 12);
   }
 
   @Test
@@ -139,9 +144,9 @@ public class TestDurability {
     byte[] col3 = Bytes.toBytes("col3");
 
     // Setting up region
-    HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir",
-        "myhlogdir_archive", CONF);
+    final WALFactory wals = new WALFactory(CONF, null, "TestIncrement");
     byte[] tableName = Bytes.toBytes("TestIncrement");
+    final WAL wal = wals.getWAL(tableName);
     HRegion region = createHRegion(tableName, "increment", wal, Durability.USE_DEFAULT);
 
     // col1: amount = 1, 1 write back to WAL
@@ -150,7 +155,7 @@ public class TestDurability {
     Result res = region.increment(inc1);
     assertEquals(1, res.size());
     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
-    verifyHLogCount(wal, 1);
+    verifyWALCount(wals, wal, 1);
 
     // col1: amount = 0, 0 write back to WAL
     inc1 = new Increment(row1);
@@ -158,7 +163,7 @@ public class TestDurability {
     res = region.increment(inc1);
     assertEquals(1, res.size());
     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
-    verifyHLogCount(wal, 1);
+    verifyWALCount(wals, wal, 1);
 
     // col1: amount = 0, col2: amount = 0, col3: amount = 0
     // 0 write back to WAL
@@ -171,7 +176,7 @@ public class TestDurability {
     assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
     assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
     assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
-    verifyHLogCount(wal, 1);
+    verifyWALCount(wals, wal, 1);
 
     // col1: amount = 5, col2: amount = 4, col3: amount = 3
     // 1 write back to WAL
@@ -184,7 +189,7 @@ public class TestDurability {
     assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
     assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
     assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
-    verifyHLogCount(wal, 2);
+    verifyWALCount(wals, wal, 2);
   }
 
   private Put newPut(Durability durability) {
@@ -196,11 +201,11 @@ public class TestDurability {
     return p;
   }
 
-  private void verifyHLogCount(HLog log, int expected) throws Exception {
-    Path walPath = ((FSHLog) log).computeFilename();
-    HLog.Reader reader = HLogFactory.createReader(FS, walPath, CONF);
+  private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
+    Path walPath = DefaultWALProvider.getCurrentFileName(log);
+    WAL.Reader reader = wals.createReader(FS, walPath);
     int count = 0;
-    HLog.Entry entry = new HLog.Entry();
+    WAL.Entry entry = new WAL.Entry();
     while (reader.next(entry) != null) count++;
     reader.close();
     assertEquals(expected, count);
@@ -208,7 +213,7 @@ public class TestDurability {
 
   // lifted from TestAtomicOperation
   private HRegion createHRegion (byte [] tableName, String callingMethod,
-      HLog log, Durability durability)
+      WAL log, Durability durability)
     throws IOException {
       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
       htd.setDurability(durability);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
new file mode 100644
index 0000000..6a7b7fa
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -0,0 +1,478 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Provides FSHLog test cases.
+ */
+@Category(MediumTests.class)
+public class TestFSHLog {
+  protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
+      .getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  protected static Configuration conf;
+  protected static FileSystem fs;
+  protected static Path dir;
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Rule
+  public final TestName currentTest = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    FileStatus[] entries = fs.listStatus(new Path("/"));
+    for (FileStatus dir : entries) {
+      fs.delete(dir.getPath(), true);
+    }
+    final Path hbaseDir = TEST_UTIL.createRootDir();
+    dir = new Path(hbaseDir, currentTest.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make block sizes small.
+    TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration()
+        .setInt("hbase.ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+        "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setInt(
+      "hbase.ipc.client.connection.maxidletime", 500);
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        SampleRegionWALObserver.class.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * A loaded WAL coprocessor won't break existing WAL test cases.
+   */
+  @Test
+  public void testWALCoprocessorLoaded() throws Exception {
+    // test to see whether the coprocessor is loaded or not.
+    FSHLog log = null;
+    try {
+      log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+      WALCoprocessorHost host = log.getCoprocessorHost();
+      Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+      assertNotNull(c);
+    } finally {
+      if (log != null) {
+        log.close();
+      }
+    }
+  }
+
+  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+                        int times, AtomicLong sequenceId) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor();
+    htd.addFamily(new HColumnDescriptor("row"));
+
+    final byte [] row = Bytes.toBytes("row");
+    for (int i = 0; i < times; i++) {
+      long timestamp = System.currentTimeMillis();
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, row, row, timestamp, row));
+      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
+          sequenceId, true, null);
+    }
+    log.sync();
+  }
+
+  /**
+   * helper method to simulate region flush for a WAL.
+   * @param wal
+   * @param regionEncodedName
+   */
+  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
+    wal.startCacheFlush(regionEncodedName);
+    wal.completeCacheFlush(regionEncodedName);
+  }
+
+  /**
+   * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
+   * exception if we do). Comparison is based on the timestamp present in the wal name.
+   * @throws Exception
+   */
+  @Test 
+  public void testWALComparator() throws Exception {
+    FSHLog wal1 = null;
+    FSHLog walMeta = null;
+    try {
+      wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+      LOG.debug("Log obtained is: " + wal1);
+      Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
+      Path p1 = wal1.computeFilename(11);
+      Path p2 = wal1.computeFilename(12);
+      // comparing with itself returns 0
+      assertTrue(comp.compare(p1, p1) == 0);
+      // comparing with different filenum.
+      assertTrue(comp.compare(p1, p2) < 0);
+      walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
+          DefaultWALProvider.META_WAL_PROVIDER_ID);
+      Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
+
+      Path p1WithMeta = walMeta.computeFilename(11);
+      Path p2WithMeta = walMeta.computeFilename(12);
+      assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
+      assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
+      // mixing meta and non-meta logs gives error
+      boolean ex = false;
+      try {
+        comp.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        ex = true;
+      }
+      assertTrue("Comparator doesn't complain while checking meta log files", ex);
+      boolean exMeta = false;
+      try {
+        compMeta.compare(p1WithMeta, p2);
+      } catch (IllegalArgumentException e) {
+        exMeta = true;
+      }
+      assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
+    } finally {
+      if (wal1 != null) {
+        wal1.close();
+      }
+      if (walMeta != null) {
+        walMeta.close();
+      }
+    }
+  }
+
+  /**
+   * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
+   * list of regions which should be flushed in order to archive the oldest wal file.
+   * <p>
+   * This method tests this behavior by inserting edits and rolling the wal enough times to reach
+   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
+   * rolling the wal.
+   * @throws Exception
+   */
+  @Test 
+  public void testFindMemStoresEligibleForFlush() throws Exception {
+    LOG.debug("testFindMemStoresEligibleForFlush");
+    Configuration conf1 = HBaseConfiguration.create(conf);
+    conf1.setInt("hbase.regionserver.maxlogs", 1);
+    FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
+        HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+    TableName t1 = TableName.valueOf("t1");
+    TableName t2 = TableName.valueOf("t2");
+    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    // variables to mock region sequenceIds
+    final AtomicLong sequenceId1 = new AtomicLong(1);
+    final AtomicLong sequenceId2 = new AtomicLong(1);
+    // add edits and roll the wal
+    try {
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // add some more edits and roll the wal. This would reach the log number threshold
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // with above rollWriter call, the max logs limit is reached.
+      assertTrue(wal.getNumRolledLogFiles() == 2);
+
+      // get the regions to flush; since there is only one region in the oldest wal, it should
+      // return only one region.
+      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(1, regionsToFlush.length);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // insert edits in second region
+      addEdits(wal, hri2, t2, 2, sequenceId2);
+      // get the regions to flush, it should still read region1.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(regionsToFlush.length, 1);
+      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
+      // remain.
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      wal.rollWriter();
+      // only one wal should remain now (that is for the second region).
+      assertEquals(1, wal.getNumRolledLogFiles());
+      // flush the second region
+      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      wal.rollWriter(true);
+      // no wal should remain now.
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // add edits both to region 1 and region 2, and roll.
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      addEdits(wal, hri2, t2, 2, sequenceId2);
+      wal.rollWriter();
+      // add edits and roll the writer, to reach the max logs limit.
+      assertEquals(1, wal.getNumRolledLogFiles());
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      wal.rollWriter();
+      // it should return two regions to flush, as the oldest wal file has entries
+      // for both regions.
+      regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(2, regionsToFlush.length);
+      // flush both regions
+      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      wal.rollWriter(true);
+      assertEquals(0, wal.getNumRolledLogFiles());
+      // Add an edit to region1, and roll the wal.
+      addEdits(wal, hri1, t1, 2, sequenceId1);
+      // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
+      wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+      wal.rollWriter();
+      wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+      assertEquals(1, wal.getNumRolledLogFiles());
+    } finally {
+      if (wal != null) {
+        wal.close();
+      }
+    }
+  }
+
+  /**
+   * Simulates WAL append ops for a region and tests
+   * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
+   * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
+   * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
+   * region should be flushed before archiving this WAL.
+  */
+  @Test
+  public void testAllRegionsFlushed() {
+    LOG.debug("testAllRegionsFlushed");
+    Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
+    Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
+    Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
+    // create a table
+    TableName t1 = TableName.valueOf("t1");
+    // create a region
+    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    // variables to mock region sequenceIds
+    final AtomicLong sequenceId1 = new AtomicLong(1);
+    // test empty map
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // add entries in the region
+    seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
+    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+    // should say region1 is not flushed.
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // test with entries in oldestFlushing map.
+    oldestUnFlushedSeqNo.clear();
+    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
+    oldestFlushingSeqNo.clear();
+    oldestUnFlushedSeqNo.clear();
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+    // insert some large values for region1
+    oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
+    seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
+    assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+
+    // tests when oldestUnFlushed/oldestFlushing contains larger value.
+    // It means region is flushed.
+    oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
+    oldestUnFlushedSeqNo.clear();
+    seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
+    assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+  }
+
+  @Test(expected=IOException.class)
+  public void testFailedToCreateWALIfParentRenamed() throws IOException {
+    final String name = "testFailedToCreateWALIfParentRenamed";
+    FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
+        conf, null, true, null, null);
+    long filenum = System.currentTimeMillis();
+    Path path = log.computeFilename(filenum);
+    log.createWriterInstance(path);
+    Path parent = path.getParent();
+    path = log.computeFilename(filenum + 1);
+    Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
+    fs.rename(parent, newPath);
+    log.createWriterInstance(path);
+    fail("It should fail to create the new WAL");
+  }
+
+  /**
+   * Test flush for sure has a sequence id that is beyond the last edit appended.  We do this
+   * by slowing appends in the background ring buffer thread while in foreground we call
+   * flush.  The addition of the sync over HRegion in flush should fix an issue where flush was
+   * returning before all of its appends had made it out to the WAL (HBASE-11109).
+   * @throws IOException
+   * @see HBASE-11109
+   */
+  @Test
+  public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
+    String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
+    final TableName tableName = TableName.valueOf(testName);
+    final HRegionInfo hri = new HRegionInfo(tableName);
+    final byte[] rowName = tableName.getName();
+    final HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("f"));
+    HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
+      TEST_UTIL.getConfiguration(), htd);
+    HRegion.closeHRegion(r);
+    final int countPerFamily = 10;
+    final MutableBoolean goslow = new MutableBoolean(false);
+    // subclass and doctor a method.
+    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
+        testName, conf) {
+      @Override
+      void atHeadOfRingBufferEventHandlerAppend() {
+        if (goslow.isTrue()) {
+          Threads.sleep(100);
+          LOG.debug("Sleeping before appending 100ms");
+        }
+        super.atHeadOfRingBufferEventHandlerAppend();
+      }
+    };
+    HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
+      TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+    EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+    try {
+      List<Put> puts = null;
+      for (HColumnDescriptor hcd: htd.getFamilies()) {
+        puts =
+          TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
+      }
+
+      // Now assert edits made it in.
+      final Get g = new Get(rowName);
+      Result result = region.get(g);
+      assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+
+      // Construct a WALEdit and add it a few times to the WAL.
+      WALEdit edits = new WALEdit();
+      for (Put p: puts) {
+        CellScanner cs = p.cellScanner();
+        while (cs.advance()) {
+          edits.add(cs.current());
+        }
+      }
+      // Add any old cluster id.
+      List<UUID> clusterIds = new ArrayList<UUID>();
+      clusterIds.add(UUID.randomUUID());
+      // Now make appends run slow.
+      goslow.setValue(true);
+      for (int i = 0; i < countPerFamily; i++) {
+        final HRegionInfo info = region.getRegionInfo();
+        final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
+            System.currentTimeMillis(), clusterIds, -1, -1);
+        wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
+      }
+      region.flushcache();
+      // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
+      long currentSequenceId = region.getSequenceId().get();
+      // Now release the appends
+      goslow.setValue(false);
+      synchronized (goslow) {
+        goslow.notifyAll();
+      }
+      assertTrue(currentSequenceId >= region.getSequenceId().get());
+    } finally {
+      region.close(true);
+      wal.close();
+    }
+  }
+
+}


Mime
View raw message