hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [17/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.
Date Tue, 02 Dec 2014 17:20:56 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 592e4a6..8fd0ba7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
@@ -55,7 +56,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -64,6 +65,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -88,8 +98,8 @@ import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
 /**
- * Implementation of {@link HLog} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
- * Only one HLog/WAL is ever being written at a time.  When a WAL hits a configured maximum size,
+ * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
+ * Only one WAL is ever being written at a time.  When a WAL hits a configured maximum size,
  * it is rolled.  This is done internal to the implementation.
  *
  * <p>As data is flushed from the MemStore to other on-disk structures (files sorted by
@@ -101,11 +111,11 @@ import com.lmax.disruptor.dsl.ProducerType;
  * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
  * (smaller) than the most-recent flush.
  *
- * <p>To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem,
- * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
+ * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
+ * org.apache.hadoop.fs.Path)}.
  */
 @InterfaceAudience.Private
-class FSHLog implements HLog, Syncable {
+public class FSHLog implements WAL {
   // IMPLEMENTATION NOTES:
   //
   // At the core is a ring buffer.  Our ring buffer is the LMAX Disruptor.  It tries to
@@ -175,12 +185,6 @@ class FSHLog implements HLog, Syncable {
    */
   private final Map<Thread, SyncFuture> syncFuturesByHandler;
 
-  private final FileSystem fs;
-  private final Path fullPathLogDir;
-  private final Path fullPathOldLogDir;
-  private final Configuration conf;
-  private final String logFilePrefix;
-
   /**
    * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
    * ring buffer sequence.  Maintained by the ring buffer consumer.
@@ -194,9 +198,63 @@ class FSHLog implements HLog, Syncable {
    */
   private final AtomicLong highestSyncedSequence = new AtomicLong(0);
 
-  private WALCoprocessorHost coprocessorHost;
+  /**
+   * file system instance
+   */
+  private final FileSystem fs;
+  /**
+   * WAL directory, where all WAL files would be placed.
+   */
+  private final Path fullPathLogDir;
+  /**
+   * dir path where old logs are kept.
+   */
+  private final Path fullPathArchiveDir;
 
   /**
+   * Matches just those wal files that belong to this wal instance.
+   */
+  private final PathFilter ourFiles;
+
+  /**
+   * Prefix of a WAL file, usually the region server name it is hosted on.
+   */
+  private final String logFilePrefix;
+
+  /**
+   * Suffix included on generated wal file names 
+   */
+  private final String logFileSuffix;
+
+  /**
+   * Prefix used when checking for wal membership.
+   */
+  private final String prefixPathStr;
+
+  private final WALCoprocessorHost coprocessorHost;
+
+  /**
+   * conf object
+   */
+  private final Configuration conf;
+  /** Listeners that are called on WAL events. */
+  private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
+
+  @Override
+  public void registerWALActionsListener(final WALActionsListener listener) {
+    this.listeners.add(listener);
+  }
+  
+  @Override
+  public boolean unregisterWALActionsListener(final WALActionsListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  @Override
+  public WALCoprocessorHost getCoprocessorHost() {
+    return coprocessorHost;
+  }
+  /**
    * FSDataOutputStream associated with the current SequenceFile.writer
    */
   private FSDataOutputStream hdfs_out;
@@ -243,37 +301,23 @@ class FSHLog implements HLog, Syncable {
    */
   private final ReentrantLock rollWriterLock = new ReentrantLock(true);
 
-  // Listeners that are called on WAL events.
-  private final List<WALActionsListener> listeners =
-    new CopyOnWriteArrayList<WALActionsListener>();
-
   private volatile boolean closed = false;
-
-  /**
-   * Set when this WAL is for meta only (we run a WAL for all regions except meta -- it has its
-   * own dedicated WAL).
-   */
-  private final boolean forMeta;
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
   // The timestamp (in ms) when the log file was created.
   private final AtomicLong filenum = new AtomicLong(-1);
 
-  // Number of transactions in the current Hlog.
+  // Number of transactions in the current Wal.
   private final AtomicInteger numEntries = new AtomicInteger(0);
 
   // If > than this size, roll the log.
   private final long logrollsize;
 
   /**
-   * The total size of hlog
+   * The total size of wal
    */
   private AtomicLong totalLogSize = new AtomicLong(0);
 
-  /**
-   * If WAL is enabled.
-   */
-  private final boolean enabled;
-
   /*
    * If more than this many logs, force flush of oldest region to oldest edit
    * goes to disk.  If too many and we crash, then will take forever replaying.
@@ -285,7 +329,6 @@ class FSHLog implements HLog, Syncable {
   private final int closeErrorsTolerated;
 
   private final AtomicInteger closeErrorCount = new AtomicInteger();
-  private final MetricsWAL metrics;
 
   // Region sequence id accounting across flushes and for knowing when we can GC a WAL.  These
   // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
@@ -334,6 +377,7 @@ class FSHLog implements HLog, Syncable {
 
   /**
    * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
+   * Throws an IllegalArgumentException if used to compare paths from different wals.
    */
   public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
     @Override
@@ -380,15 +424,14 @@ class FSHLog implements HLog, Syncable {
    * Constructor.
    *
    * @param fs filesystem handle
-   * @param root path for stored and archived hlogs
-   * @param logDir dir where hlogs are stored
+   * @param root path for stored and archived wals
+   * @param logDir dir where wals are stored
    * @param conf configuration to use
    * @throws IOException
    */
-  public FSHLog(final FileSystem fs, final Path root, final String logDir,
-    final Configuration conf)
-  throws IOException {
-    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false);
+  public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
+      throws IOException {
+    this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
   }
 
   /**
@@ -396,46 +439,95 @@ class FSHLog implements HLog, Syncable {
    *
    * You should never have to load an existing log. If there is a log at
    * startup, it should have already been processed and deleted by the time the
-   * HLog object is started up.
+   * WAL object is started up.
    *
    * @param fs filesystem handle
    * @param rootDir path to where logs and oldlogs
-   * @param logDir dir where hlogs are stored
-   * @param oldLogDir dir where hlogs are archived
+   * @param logDir dir where wals are stored
+   * @param archiveDir dir where wals are archived
    * @param conf configuration to use
    * @param listeners Listeners on WAL events. Listeners passed here will
    * be registered before we do anything else; e.g. the
    * Constructor {@link #rollWriter()}.
-   * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
+   * @param failIfWALExists If true IOException will be thrown if files related to this wal
+   *        already exist.
    * @param prefix should always be hostname and port in distributed env and
    *        it will be URL encoded before being used.
-   *        If prefix is null, "hlog" will be used
-   * @param forMeta if this hlog is meant for meta updates
+   *        If prefix is null, "wal" will be used
+   * @param suffix will be url encoded. null is treated as empty. non-empty must start with
+   *        {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
    * @throws IOException
    */
   public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
-      final String oldLogDir, final Configuration conf,
+      final String archiveDir, final Configuration conf,
       final List<WALActionsListener> listeners,
-      final boolean failIfLogDirExists, final String prefix, boolean forMeta)
-  throws IOException {
-    super();
+      final boolean failIfWALExists, final String prefix, final String suffix)
+      throws IOException {
     this.fs = fs;
     this.fullPathLogDir = new Path(rootDir, logDir);
-    this.fullPathOldLogDir = new Path(rootDir, oldLogDir);
-    this.forMeta = forMeta;
+    this.fullPathArchiveDir = new Path(rootDir, archiveDir);
     this.conf = conf;
 
+    if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
+      throw new IOException("Unable to mkdir " + fullPathLogDir);
+    }
+
+    if (!fs.exists(this.fullPathArchiveDir)) {
+      if (!fs.mkdirs(this.fullPathArchiveDir)) {
+        throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
+      }
+    }
+
+    // If prefix is null||empty then just name it wal
+    this.logFilePrefix =
+      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
+    // we only correctly differentiate suffices when numeric ones start with '.'
+    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
+      throw new IllegalArgumentException("wal suffix must start with '" + WAL_FILE_NAME_DELIMITER +
+          "' but instead was '" + suffix + "'");
+    }
+    this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
+    this.prefixPathStr = new Path(fullPathLogDir,
+        logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
+
+    this.ourFiles = new PathFilter() {
+      @Override
+      public boolean accept(final Path fileName) {
+        // The path should start with dir/<prefix> and end with our suffix
+        final String fileNameString = fileName.toString();
+        if (!fileNameString.startsWith(prefixPathStr)) {
+          return false;
+        }
+        if (logFileSuffix.isEmpty()) {
+          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
+          return org.apache.commons.lang.StringUtils.isNumeric(
+              fileNameString.substring(prefixPathStr.length()));
+        } else if (!fileNameString.endsWith(logFileSuffix)) {
+          return false;
+        }
+        return true;
+      }
+    };
+
+    if (failIfWALExists) {
+      final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
+      if (null != walFiles && 0 != walFiles.length) {
+        throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
+      }
+    }
+
     // Register listeners.  TODO: Should this exist anymore?  We have CPs?
     if (listeners != null) {
       for (WALActionsListener i: listeners) {
         registerWALActionsListener(i);
       }
     }
+    this.coprocessorHost = new WALCoprocessorHost(this, conf);
 
     // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
     // (it costs a little x'ing bocks)
-    long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
-      FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
+    final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
+        FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
     this.logrollsize =
       (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
 
@@ -444,31 +536,13 @@ class FSHLog implements HLog, Syncable {
         FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
     this.lowReplicationRollLimit =
       conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
-    this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
-    // If prefix is null||empty then just name it hlog
-    this.logFilePrefix =
-      prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8");
     int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
 
     LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
-      ", enabled=" + this.enabled + ", prefix=" + this.logFilePrefix + ", logDir=" +
-      this.fullPathLogDir + ", oldLogDir=" + this.fullPathOldLogDir);
-
-    boolean dirExists = false;
-    if (failIfLogDirExists && (dirExists = this.fs.exists(fullPathLogDir))) {
-      throw new IOException("Target HLog directory already exists: " + fullPathLogDir);
-    }
-    if (!dirExists && !fs.mkdirs(fullPathLogDir)) {
-      throw new IOException("Unable to mkdir " + fullPathLogDir);
-    }
-
-    if (!fs.exists(this.fullPathOldLogDir)) {
-      if (!fs.mkdirs(this.fullPathOldLogDir)) {
-        throw new IOException("Unable to mkdir " + this.fullPathOldLogDir);
-      }
-    }
+      ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
+      this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
 
     // rollWriter sets this.hdfs_out if it can.
     rollWriter();
@@ -481,9 +555,6 @@ class FSHLog implements HLog, Syncable {
     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
     this.getPipeLine = getGetPipeline(this.hdfs_out);
 
-    this.coprocessorHost = new WALCoprocessorHost(this, conf);
-    this.metrics = new MetricsWAL();
-
     // This is the 'writer' -- a single threaded executor.  This single thread 'consumes' what is
     // put on the ring buffer.
     String hostingThreadName = Thread.currentThread().getName();
@@ -515,56 +586,22 @@ class FSHLog implements HLog, Syncable {
   }
 
   /**
-   * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
-   * @return Method or null.
+   * Get the backing files associated with this WAL.
+   * @return may be null if there are no files.
    */
-  private static Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
-    // TODO: Remove all this and use the now publically available
-    // HdfsDataOutputStream#getCurrentBlockReplication()
-    Method m = null;
-    if (os != null) {
-      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
-      try {
-        m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
-        m.setAccessible(true);
-      } catch (NoSuchMethodException e) {
-        LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
-         "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
-      } catch (SecurityException e) {
-        LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
-          "not available; fsOut=" + wrappedStreamClass.getName(), e);
-        m = null; // could happen on setAccessible()
-      }
-    }
-    if (m != null) {
-      if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
-    }
-    return m;
-  }
-
-  @Override
-  public void registerWALActionsListener(final WALActionsListener listener) {
-    this.listeners.add(listener);
-  }
-
-  @Override
-  public boolean unregisterWALActionsListener(final WALActionsListener listener) {
-    return this.listeners.remove(listener);
-  }
-
-  @Override
-  public long getFilenum() {
-    return this.filenum.get();
+  protected FileStatus[] getFiles() throws IOException {
+    return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
   }
 
   /**
-   * Method used internal to this class and for tests only.
-   * @return The wrapped stream our writer is using; its not the
-   * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
-   * (In hdfs its an instance of DFSDataOutputStream).
-   *
-   * usage: see TestLogRolling.java
+   * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate
+   * the default behavior (such as setting the maxRecoveryErrorCount value for example (see
+   * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the
+   * underlying HDFS OutputStream.
+   * NOTE: This could be removed once Hadoop1 support is removed.
+   * @return null if underlying stream is not ready.
    */
+  @VisibleForTesting
   OutputStream getOutputStream() {
     return this.hdfs_out.getWrappedStream();
   }
@@ -574,12 +611,16 @@ class FSHLog implements HLog, Syncable {
     return rollWriter(false);
   }
 
+  /**
+   * retrieve the next path to use for writing.
+   * Increments the internal filenum.
+   */
   private Path getNewPath() throws IOException {
     this.filenum.set(System.currentTimeMillis());
-    Path newPath = computeFilename();
+    Path newPath = getCurrentFileName();
     while (fs.exists(newPath)) {
       this.filenum.incrementAndGet();
-      newPath = computeFilename();
+      newPath = getCurrentFileName();
     }
     return newPath;
   }
@@ -588,7 +629,7 @@ class FSHLog implements HLog, Syncable {
     long currentFilenum = this.filenum.get();
     Path oldPath = null;
     if (currentFilenum > 0) {
-      // ComputeFilename  will take care of meta hlog filename
+      // ComputeFilename  will take care of meta wal filename
       oldPath = computeFilename(currentFilenum);
     } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
     return oldPath;
@@ -644,11 +685,11 @@ class FSHLog implements HLog, Syncable {
       if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
       byte [][] regionsToFlush = null;
       if (this.closed) {
-        LOG.debug("HLog closed. Skipping rolling of writer");
+        LOG.debug("WAL closed. Skipping rolling of writer");
         return regionsToFlush;
       }
       if (!closeBarrier.beginOp()) {
-        LOG.debug("HLog closing. Skipping rolling of writer");
+        LOG.debug("WAL closing. Skipping rolling of writer");
         return regionsToFlush;
       }
       TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
@@ -656,7 +697,7 @@ class FSHLog implements HLog, Syncable {
         Path oldPath = getOldPath();
         Path newPath = getNewPath();
         // Any exception from here on is catastrophic, non-recoverable so we currently abort.
-        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
+        Writer nextWriter = this.createWriterInstance(newPath);
         FSDataOutputStream nextHdfsOut = null;
         if (nextWriter instanceof ProtobufLogWriter) {
           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
@@ -688,18 +729,10 @@ class FSHLog implements HLog, Syncable {
    * This method allows subclasses to inject different writers without having to
    * extend other methods like rollWriter().
    *
-   * @param fs
-   * @param path
-   * @param conf
    * @return Writer instance
-   * @throws IOException
    */
-  protected Writer createWriterInstance(final FileSystem fs, final Path path,
-      final Configuration conf) throws IOException {
-    if (forMeta) {
-      //TODO: set a higher replication for the hlog files (HBASE-6773)
-    }
-    return HLogFactory.createWALWriter(fs, path, conf);
+  protected Writer createWriterInstance(final Path path) throws IOException {
+    return DefaultWALProvider.createWriter(conf, fs, path, false);
   }
 
   /**
@@ -747,7 +780,7 @@ class FSHLog implements HLog, Syncable {
    * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
    * for all regions, the value is lesser than the minimum of values present in the
    * oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
-   * @param sequenceNums for a HLog, at the time when it was rolled.
+   * @param sequenceNums for a WAL, at the time when it was rolled.
    * @param oldestFlushingMap
    * @param oldestUnflushedMap
    * @return true if wal is eligible for archiving, false otherwise.
@@ -816,7 +849,7 @@ class FSHLog implements HLog, Syncable {
         if (i > 0) sb.append(", ");
         sb.append(Bytes.toStringBinary(regions[i]));
       }
-      LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
+      LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
          this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
          sb.toString());
     }
@@ -825,16 +858,23 @@ class FSHLog implements HLog, Syncable {
 
   /**
    * Cleans up current writer closing it and then puts in place the passed in
-   * <code>nextWriter</code>
-   * 
-   * @param oldPath
-   * @param newPath
-   * @param nextWriter
-   * @param nextHdfsOut
-   * @return <code>newPath</code>
-   * @throws IOException
+   * <code>nextWriter</code>.
+   *
+   * In the case of creating a new WAL, oldPath will be null.
+   *
+   * In the case of rolling over from one file to the next, none of the params will be null.
+   *
+   * In the case of closing out this FSHLog with no further use newPath, nextWriter, and
+   * nextHdfsOut will be null.
+   *
+   * @param oldPath may be null
+   * @param newPath may be null
+   * @param nextWriter may be null
+   * @param nextHdfsOut may be null
+   * @return the passed in <code>newPath</code>
+   * @throws IOException if there is a problem flushing or closing the underlying FS
    */
-  Path replaceWriter(final Path oldPath, final Path newPath, FSHLog.Writer nextWriter,
+  Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
       final FSDataOutputStream nextHdfsOut)
   throws IOException {
     // Ask the ring buffer writer to pause at a safe point.  Once we do this, the writer
@@ -887,6 +927,7 @@ class FSHLog implements HLog, Syncable {
       this.hdfs_out = nextHdfsOut;
       int oldNumEntries = this.numEntries.get();
       this.numEntries.set(0);
+      final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
       if (oldPath != null) {
         this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
         this.highestRegionSequenceIds = new HashMap<byte[], Long>();
@@ -894,16 +935,16 @@ class FSHLog implements HLog, Syncable {
         this.totalLogSize.addAndGet(oldFileLen);
         LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
           ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
-          FSUtils.getPath(newPath));
+          newPathString);
       } else {
-        LOG.info("New WAL " + FSUtils.getPath(newPath));
+        LOG.info("New WAL " + newPathString);
       }
     } catch (InterruptedException ie) {
       // Perpetuate the interrupt
       Thread.currentThread().interrupt();
     } catch (IOException e) {
       long count = getUnflushedEntriesCount();
-      LOG.error("Failed close of HLog writer " + oldPath + ", unflushedEntries=" + count, e);
+      LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
       throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
     } finally {
       try {
@@ -930,8 +971,16 @@ class FSHLog implements HLog, Syncable {
     return getUnflushedEntriesCount() > 0;
   }
 
+  /*
+   * only public so WALSplitter can use.
+   * @return archived location of a WAL file with the given path p
+   */
+  public static Path getWALArchivePath(Path archiveDir, Path p) {
+    return new Path(archiveDir, p.getName());
+  }
+
   private void archiveLogFile(final Path p) throws IOException {
-    Path newPath = getHLogArchivePath(this.fullPathOldLogDir, p);
+    Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
     // Tell our listeners that a log is going to be archived.
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i : this.listeners) {
@@ -956,24 +1005,25 @@ class FSHLog implements HLog, Syncable {
    * @return Path
    */
   protected Path computeFilename(final long filenum) {
-    this.filenum.set(filenum);
-    return computeFilename();
+    if (filenum < 0) {
+      throw new RuntimeException("wal file number can't be < 0");
+    }
+    String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
+    return new Path(fullPathLogDir, child);
   }
 
   /**
    * This is a convenience method that computes a new filename with a given
-   * using the current HLog file-number
+   * using the current WAL file-number
    * @return Path
    */
-  protected Path computeFilename() {
-    if (this.filenum.get() < 0) {
-      throw new RuntimeException("hlog file number can't be < 0");
-    }
-    String child = logFilePrefix + "." + filenum;
-    if (forMeta) {
-      child += HLog.META_HLOG_FILE_EXTN;
-    }
-    return new Path(fullPathLogDir, child);
+  public Path getCurrentFileName() {
+    return computeFilename(this.filenum.get());
+  }
+
+  @Override
+  public String toString() {
+    return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
   }
 
 /**
@@ -986,26 +1036,23 @@ class FSHLog implements HLog, Syncable {
  */
   protected long getFileNumFromFileName(Path fileName) {
     if (fileName == null) throw new IllegalArgumentException("file name can't be null");
-    // The path should start with dir/<prefix>.
-    String prefixPathStr = new Path(fullPathLogDir, logFilePrefix + ".").toString();
-    if (!fileName.toString().startsWith(prefixPathStr)) {
-      throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" +
-          " this regionserver " + prefixPathStr);
-    }
-    String chompedPath = fileName.toString().substring(prefixPathStr.length());
-    if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
+    if (!ourFiles.accept(fileName)) {
+      throw new IllegalArgumentException("The log file " + fileName +
+          " doesn't belong to this wal. (" + toString() + ")");
+    }
+    final String fileNameString = fileName.toString();
+    String chompedPath = fileNameString.substring(prefixPathStr.length(),
+        (fileNameString.length() - logFileSuffix.length()));
     return Long.parseLong(chompedPath);
   }
 
   @Override
-  public void closeAndDelete() throws IOException {
-    close();
-    if (!fs.exists(this.fullPathLogDir)) return;
-    FileStatus[] files = fs.listStatus(this.fullPathLogDir);
-    if (files != null) {
-      for(FileStatus file : files) {
-
-        Path p = getHLogArchivePath(this.fullPathOldLogDir, file.getPath());
+  public void close() throws IOException {
+    shutdown();
+    final FileStatus[] files = getFiles();
+    if (null != files && 0 != files.length) {
+      for (FileStatus file : files) {
+        Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
         // Tell our listeners that a log is going to be archived.
         if (!this.listeners.isEmpty()) {
           for (WALActionsListener i : this.listeners) {
@@ -1024,54 +1071,53 @@ class FSHLog implements HLog, Syncable {
         }
       }
       LOG.debug("Moved " + files.length + " WAL file(s) to " +
-        FSUtils.getPath(this.fullPathOldLogDir));
-    }
-    if (!fs.delete(fullPathLogDir, true)) {
-      LOG.info("Unable to delete " + fullPathLogDir);
+        FSUtils.getPath(this.fullPathArchiveDir));
     }
+    LOG.info("Closed WAL: " + toString() );
   }
 
   @Override
-  public void close() throws IOException {
-    if (this.closed) return;
-    try {
-      // Prevent all further flushing and rolling.
-      closeBarrier.stopAndDrainOps();
-    } catch (InterruptedException e) {
-      LOG.error("Exception while waiting for cache flushes and log rolls", e);
-      Thread.currentThread().interrupt();
-    }
-
-    // Shutdown the disruptor.  Will stop after all entries have been processed.  Make sure we have
-    // stopped incoming appends before calling this else it will not shutdown.  We are
-    // conservative below waiting a long time and if not elapsed, then halting.
-    if (this.disruptor != null) {
-      long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
+  public void shutdown() throws IOException {
+    if (shutdown.compareAndSet(false, true)) {
       try {
-        this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
-      } catch (TimeoutException e) {
-        LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
-          "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
-        this.disruptor.halt();
-        this.disruptor.shutdown();
+        // Prevent all further flushing and rolling.
+        closeBarrier.stopAndDrainOps();
+      } catch (InterruptedException e) {
+        LOG.error("Exception while waiting for cache flushes and log rolls", e);
+        Thread.currentThread().interrupt();
       }
-    }
-    // With disruptor down, this is safe to let go.
-    if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
 
-    // Tell our listeners that the log is closing
-    if (!this.listeners.isEmpty()) {
-      for (WALActionsListener i : this.listeners) {
-        i.logCloseRequested();
+      // Shutdown the disruptor.  Will stop after all entries have been processed.  Make sure we
+      // have stopped incoming appends before calling this else it will not shutdown.  We are
+      // conservative below waiting a long time and if not elapsed, then halting.
+      if (this.disruptor != null) {
+        long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
+        try {
+          this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
+            "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
+          this.disruptor.halt();
+          this.disruptor.shutdown();
+        }
+      }
+      // With disruptor down, this is safe to let go.
+      if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
+
+      // Tell our listeners that the log is closing
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener i : this.listeners) {
+          i.logCloseRequested();
+        }
+      }
+      this.closed = true;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
+      }
+      if (this.writer != null) {
+        this.writer.close();
+        this.writer = null;
       }
-    }
-    this.closed = true;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Closing WAL writer in " + this.fullPathLogDir.toString());
-    }
-    if (this.writer != null) {
-      this.writer.close();
-      this.writer = null;
     }
   }
 
@@ -1083,60 +1129,18 @@ class FSHLog implements HLog, Syncable {
    * @param clusterIds that have consumed the change
    * @return New log key.
    */
-  protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
+  protected WALKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
       long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
     return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
   }
   
-  @Override
-  @VisibleForTesting
-  public void append(HRegionInfo info, TableName tableName, WALEdit edits,
-    final long now, HTableDescriptor htd, AtomicLong sequenceId)
-  throws IOException {
-    HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
-    append(htd, info, logKey, edits, sequenceId, true, true, null);
-  }
-
-  @Override
-  public long appendNoSync(final HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
-      boolean inMemstore, long nonceGroup, long nonce) throws IOException {
-    HLogKey logKey =
-      new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
-    return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null);
-  }
-
-  @Override
-  public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
-      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
-      final List<Cell> memstoreCells)
-  throws IOException {
-    return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreCells);
-  }
-
-  /**
-   * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
-   * log-sequence-id.
-   * @param key
-   * @param edits
-   * @param htd This comes in here just so it is available on a pre append for replications.  Get
-   * rid of it.  It is kinda crazy this comes in here when we have tablename and regioninfo.
-   * Replication gets its scope from the HTD.
-   * @param hri region info
-   * @param sync shall we sync after we call the append?
-   * @param inMemstore
-   * @param sequenceId The region sequence id reference.
-   * @param memstoreCells
-   * @return txid of this transaction or if nothing to do, the last txid
-   * @throws IOException
-   */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
       justification="Will never be null")
-  private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
-      WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore, 
-      List<Cell> memstoreCells)
-  throws IOException {
-    if (!this.enabled) return this.highestUnsyncedSequence;
+  @Override
+  public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
+      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
+      final List<Cell> memstoreCells) throws IOException {
     if (this.closed) throw new IOException("Cannot append; log is closed");
     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
     // single consuming thread.  Don't have to worry about it.
@@ -1157,9 +1161,6 @@ class FSHLog implements HLog, Syncable {
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
     }
-    // doSync is set in tests.  Usually we arrive in here via appendNoSync w/ the sync called after
-    // all edits on a handler have been added.
-    if (sync) sync(sequence);
     return sequence;
   }
 
@@ -1303,7 +1304,7 @@ class FSHLog implements HLog, Syncable {
             Trace.addTimelineAnnotation("writer synced");
             currentSequence = updateHighestSyncedSequence(currentSequence);
           } catch (IOException e) {
-            LOG.error("Error syncing, request close of hlog ", e);
+            LOG.error("Error syncing, request close of wal ", e);
             t = e;
           } catch (Exception e) {
             LOG.warn("UNEXPECTED", e);
@@ -1364,7 +1365,7 @@ class FSHLog implements HLog, Syncable {
             LOG.warn("HDFS pipeline error detected. " + "Found "
                 + numCurrentReplicas + " replicas but expecting no less than "
                 + this.minTolerableReplication + " replicas. "
-                + " Requesting close of hlog.");
+                + " Requesting close of wal.");
             logRollNeeded = true;
             // If rollWriter is requested, increase consecutiveLogRolls. Once it
             // is larger than lowReplicationRollLimit, disable the
@@ -1448,10 +1449,7 @@ class FSHLog implements HLog, Syncable {
     return syncFuture.reset(sequence, span);
   }
 
-  @Override
-  public void postSync(final long timeInNanos, final int handlerSyncs) {
-    // TODO: Add metric for handler syncs done at a time.
-    if (this.metrics != null) metrics.finishSync(timeInNanos/1000000);
+  private void postSync(final long timeInNanos, final int handlerSyncs) {
     if (timeInNanos > this.slowSyncNs) {
       String msg =
           new StringBuilder().append("Slow sync cost: ")
@@ -1460,19 +1458,57 @@ class FSHLog implements HLog, Syncable {
       Trace.addTimelineAnnotation(msg);
       LOG.info(msg);
     }
+    if (!listeners.isEmpty()) {
+      for (WALActionsListener listener : listeners) {
+        listener.postSync(timeInNanos, handlerSyncs);
+      }
+    }
   }
 
-  @Override
-  public long postAppend(final Entry e, final long elapsedTime) {
+  private long postAppend(final Entry e, final long elapsedTime) {
     long len = 0;
-    if (this.metrics == null) return len;
-    for (Cell cell : e.getEdit().getCells()) len += CellUtil.estimatedSerializedSizeOf(cell);
-    metrics.finishAppend(elapsedTime, len);
+    if (!listeners.isEmpty()) {
+      for (Cell cell : e.getEdit().getCells()) {
+        len += CellUtil.estimatedSerializedSizeOf(cell);
+      }
+      for (WALActionsListener listener : listeners) {
+        listener.postAppend(len, elapsedTime);
+      }
+    }
     return len;
   }
 
   /**
-   * This method gets the datanode replication count for the current HLog.
+   * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
+   * This is used for getting current replicas of a file being written.
+   * @return Method or null.
+   */
+  private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
+    // TODO: Remove all this and use the now publically available
+    // HdfsDataOutputStream#getCurrentBlockReplication()
+    Method m = null;
+    if (os != null) {
+      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
+      try {
+        m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[] {});
+        m.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
+         "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
+      } catch (SecurityException e) {
+        LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " +
+          "not available; fsOut=" + wrappedStreamClass.getName(), e);
+        m = null; // could happen on setAccessible()
+      }
+    }
+    if (m != null) {
+      if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
+    }
+    return m;
+  }
+
+  /**
+   * This method gets the datanode replication count for the current WAL.
    *
    * If the pipeline isn't started yet or is empty, you will get the default
    * replication factor.  Therefore, if this function returns 0, it means you
@@ -1483,10 +1519,12 @@ class FSHLog implements HLog, Syncable {
    *
    * @throws Exception
    */
+  @VisibleForTesting
   int getLogReplication()
   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
-    if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
-      Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
+    final OutputStream stream = getOutputStream();
+    if (this.getNumCurrentReplicas != null && stream != null) {
+      Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
       if (repl instanceof Integer) {
         return ((Integer)repl).intValue();
       }
@@ -1494,32 +1532,6 @@ class FSHLog implements HLog, Syncable {
     return 0;
   }
 
-  boolean canGetCurReplicas() {
-    return this.getNumCurrentReplicas != null;
-  }
-
-  @Override
-  public void hsync() throws IOException {
-    TraceScope scope = Trace.startSpan("FSHLog.hsync");
-    try {
-      scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
-    } finally {
-      assert scope == NullScope.INSTANCE || !scope.isDetached();
-      scope.close();
-    }
-  }
-
-  @Override
-  public void hflush() throws IOException {
-    TraceScope scope = Trace.startSpan("FSHLog.hflush");
-    try {
-      scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
-    } finally {
-      assert scope == NullScope.INSTANCE || !scope.isDetached();
-      scope.close();
-    }
-  }
-
   @Override
   public void sync() throws IOException {
     TraceScope scope = Trace.startSpan("FSHLog.sync");
@@ -1546,7 +1558,8 @@ class FSHLog implements HLog, Syncable {
     }
   }
 
-  void requestLogRoll() {
+  // public only until class moves to o.a.h.h.wal
+  public void requestLogRoll() {
     if (!this.listeners.isEmpty()) {
       for (WALActionsListener i: this.listeners) {
         i.logRollRequested();
@@ -1554,25 +1567,21 @@ class FSHLog implements HLog, Syncable {
     }
   }
 
-  /** @return How many items have been added to the log */
-  int getNumEntries() {
-    return numEntries.get();
-  }
-
+  // public only until class moves to o.a.h.h.wal
   /** @return the number of rolled log files */
   public int getNumRolledLogFiles() {
     return byWalRegionSequenceIds.size();
   }
 
+  // public only until class moves to o.a.h.h.wal
   /** @return the number of log files in use */
-  @Override
   public int getNumLogFiles() {
     // +1 for current use log
     return getNumRolledLogFiles() + 1;
   }
   
+  // public only until class moves to o.a.h.h.wal
   /** @return the size of log files in use */
-  @Override
   public long getLogFileSize() {
     return this.totalLogSize.get();
   }
@@ -1636,28 +1645,11 @@ class FSHLog implements HLog, Syncable {
     }
   }
 
-  @Override
-  public boolean isLowReplicationRollEnabled() {
+  @VisibleForTesting
+  boolean isLowReplicationRollEnabled() {
       return lowReplicationRollEnabled;
   }
 
-  /**
-   * Get the directory we are making logs in.
-   *
-   * @return dir
-   */
-  protected Path getDir() {
-    return fullPathLogDir;
-  }
-
-  static Path getHLogArchivePath(Path oldLogDir, Path p) {
-    return new Path(oldLogDir, p.getName());
-  }
-
-  static String formatRecoveredEditsFileName(final long seqid) {
-    return String.format("%019d", seqid);
-  }
-
   public static final long FIXED_OVERHEAD = ClassSize.align(
     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
@@ -1673,14 +1665,10 @@ class FSHLog implements HLog, Syncable {
     }
 
     final Path baseDir = FSUtils.getRootDir(conf);
-    final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
+    final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
   }
 
-  @Override
-  public WALCoprocessorHost getCoprocessorHost() {
-    return coprocessorHost;
-  }
 
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
@@ -1932,7 +1920,7 @@ class FSHLog implements HLog, Syncable {
 
       long start = EnvironmentEdgeManager.currentTime();
       byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
-      long regionSequenceId = HLog.NO_SEQUENCE_ID;
+      long regionSequenceId = WALKey.NO_SEQUENCE_ID;
       try {
         // We are about to append this edit; update the region-scoped sequence number.  Do it
         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
@@ -1975,7 +1963,7 @@ class FSHLog implements HLog, Syncable {
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
       } catch (Exception e) {
-        LOG.fatal("Could not append. Requesting close of hlog", e);
+        LOG.fatal("Could not append. Requesting close of wal", e);
         requestLogRoll();
         throw e;
       }
@@ -2006,7 +1994,7 @@ class FSHLog implements HLog, Syncable {
   }
 
   private static void usage() {
-    System.err.println("Usage: HLog <ARGS>");
+    System.err.println("Usage: FSHLog <ARGS>");
     System.err.println("Arguments:");
     System.err.println(" --dump  Dump textual representation of passed one or more files");
     System.err.println("         For example: " +
@@ -2014,7 +2002,6 @@ class FSHLog implements HLog, Syncable {
     System.err.println(" --split Split the passed directory of WAL logs");
     System.err.println("         For example: " +
       "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
-    System.err.println(" --perf  Write the same key <N> times to a WAL: e.g. FSHLog --perf 10");
   }
 
   /**
@@ -2029,28 +2016,14 @@ class FSHLog implements HLog, Syncable {
       usage();
       System.exit(-1);
     }
-    // either dump using the HLogPrettyPrinter or split, depending on args
+    // either dump using the WALPrettyPrinter or split, depending on args
     if (args[0].compareTo("--dump") == 0) {
-      HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
+      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
     } else if (args[0].compareTo("--perf") == 0) {
-      final int count = Integer.parseInt(args[1]);
-      // Put up a WAL and just keep adding same edit to it.  Simple perf test.
-      Configuration conf = HBaseConfiguration.create();
-      Path rootDir = FSUtils.getRootDir(conf);
-      FileSystem fs = rootDir.getFileSystem(conf);
-      FSHLog wal =
-        new FSHLog(fs, rootDir, "perflog", "oldPerflog", conf, null, false, "perf", false);
-      long start = System.nanoTime();
-      WALEdit walEdit = new WALEdit();
-      walEdit.add(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
-        Bytes.toBytes("qualifier"), -1, new byte [1000]));
-      for (AtomicLong i = new AtomicLong(0); i.get() < count; i.incrementAndGet()) {
-        wal.append(HRegionInfo.FIRST_META_REGIONINFO, TableName.META_TABLE_NAME, walEdit, start,
-          HTableDescriptor.META_TABLEDESC, i);
-        wal.sync();
-      }
-      wal.close();
-      LOG.info("Write " + count + " 1k edits in " + (System.nanoTime() - start) + "nanos");
+      LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
+      LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
+          args[1]);
+      System.exit(-1);
     } else if (args[0].compareTo("--split") == 0) {
       Configuration conf = HBaseConfiguration.create();
       for (int i = 1; i < args.length; i++) {
@@ -2098,9 +2071,9 @@ class FSHLog implements HLog, Syncable {
   }
 
   /**
-   * This method gets the pipeline for the current HLog.
-   * @return
+   * This method gets the pipeline for the current WAL.
    */
+  @VisibleForTesting
   DatanodeInfo[] getPipeLine() {
     if (this.getPipeLine != null && this.hdfs_out != null) {
       Object repl;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 0325f78..d9942b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -27,16 +27,19 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
+
 /**
  * A WAL Entry for {@link FSHLog} implementation.  Immutable.
- * A subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
+ * A subclass of {@link Entry} that carries extra info across the ring buffer such as
  * region sequence id (we want to use this later, just before we write the WAL to ensure region
  * edits maintain order).  The extra info added here is not 'serialized' as part of the WALEdit
  * hence marked 'transient' to underline this fact.  It also adds mechanism so we can wait on
  * the assign of the region sequence id.  See {@link #stampRegionSequenceId()}.
  */
 @InterfaceAudience.Private
-class FSWALEntry extends HLog.Entry {
+class FSWALEntry extends Entry {
   // The below data members are denoted 'transient' just to highlight these are not persisted;
   // they are only in memory and held here while passing over the ring buffer.
   private final transient long sequence;
@@ -46,7 +49,7 @@ class FSWALEntry extends HLog.Entry {
   private final transient HRegionInfo hri;
   private final transient List<Cell> memstoreCells;
 
-  FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
+  FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
       final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
       final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
     super(key, edit);
@@ -98,7 +101,7 @@ class FSWALEntry extends HLog.Entry {
         CellUtil.setSequenceId(cell, regionSequenceId);
       }
     }
-    HLogKey key = getKey();
+    WALKey key = getKey();
     key.setLogSeqNum(regionSequenceId);
     return regionSequenceId;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
deleted file mode 100644
index eb3692e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ /dev/null
@@ -1,445 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * HLog records all the edits to HStore.  It is the hbase write-ahead-log (WAL).
- */
-@InterfaceAudience.Private
-// TODO: Rename interface to WAL
-public interface HLog {
-  Log LOG = LogFactory.getLog(HLog.class);
-  public static final long NO_SEQUENCE_ID = -1;
-
-  /** File Extension used while splitting an HLog into regions (HBASE-2312) */
-  // TODO: this seems like an implementation detail that does not belong here.
-  String SPLITTING_EXT = "-splitting";
-  boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
-  /** The hbase:meta region's HLog filename extension.*/
-  // TODO: Implementation detail.  Does not belong in here.
-  String META_HLOG_FILE_EXTN = ".meta";
-
-  /**
-   * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the
-   * configured size, a warning is logged. This is used with Protobuf reader/writer.
-   */
-  // TODO: Implementation detail.  Why in here?
-  String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
-  int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
-
-  // TODO: Implementation detail.  Why in here?
-  Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
-  String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
-  String SEQUENCE_ID_FILE_SUFFIX = "_seqid";
-
-  /**
-   * WAL Reader Interface
-   */
-  interface Reader {
-    /**
-     * @param fs File system.
-     * @param path Path.
-     * @param c Configuration.
-     * @param s Input stream that may have been pre-opened by the caller; may be null.
-     */
-    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
-
-    void close() throws IOException;
-
-    Entry next() throws IOException;
-
-    Entry next(Entry reuse) throws IOException;
-
-    void seek(long pos) throws IOException;
-
-    long getPosition() throws IOException;
-    void reset() throws IOException;
-
-    /**
-     * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
-     * files.
-     */
-    // TODO: What we need a trailer on WAL for?  It won't be present on last WAL most of the time.
-    // What then?
-    WALTrailer getWALTrailer();
-  }
-
-  /**
-   * WAL Writer Intrface.
-   */
-  interface Writer {
-    void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
-
-    void close() throws IOException;
-
-    void sync() throws IOException;
-
-    void append(Entry entry) throws IOException;
-
-    long getLength() throws IOException;
-
-    /**
-     * Sets HLog/WAL's WALTrailer. This trailer is appended at the end of WAL on closing.
-     * @param walTrailer trailer to append to WAL.
-     */
-    // TODO: Why a trailer on the log?
-    void setWALTrailer(WALTrailer walTrailer);
-  }
-
-  /**
-   * Utility class that lets us keep track of the edit and it's associated key. Only used when
-   * splitting logs.
-   */
-  // TODO: Remove this Writable.
-  // TODO: Why is this in here?  Implementation detail?
-  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-  class Entry implements Writable {
-    private WALEdit edit;
-    private HLogKey key;
-
-    public Entry() {
-      edit = new WALEdit();
-      key = new HLogKey();
-    }
-
-    /**
-     * Constructor for both params
-     *
-     * @param edit log's edit
-     * @param key log's key
-     */
-    public Entry(HLogKey key, WALEdit edit) {
-      this.key = key;
-      this.edit = edit;
-    }
-
-    /**
-     * Gets the edit
-     *
-     * @return edit
-     */
-    public WALEdit getEdit() {
-      return edit;
-    }
-
-    /**
-     * Gets the key
-     *
-     * @return key
-     */
-    public HLogKey getKey() {
-      return key;
-    }
-
-    /**
-     * Set compression context for this entry.
-     *
-     * @param compressionContext Compression context
-     */
-    public void setCompressionContext(CompressionContext compressionContext) {
-      edit.setCompressionContext(compressionContext);
-      key.setCompressionContext(compressionContext);
-    }
-
-    @Override
-    public String toString() {
-      return this.key + "=" + this.edit;
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    public void write(DataOutput dataOutput) throws IOException {
-      this.key.write(dataOutput);
-      this.edit.write(dataOutput);
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-      this.key.readFields(dataInput);
-      this.edit.readFields(dataInput);
-    }
-  }
-
-  /**
-   * Registers WALActionsListener
-   *
-   * @param listener
-   */
-  void registerWALActionsListener(final WALActionsListener listener);
-
-  /**
-   * Unregisters WALActionsListener
-   *
-   * @param listener
-   */
-  boolean unregisterWALActionsListener(final WALActionsListener listener);
-
-  /**
-   * @return Current state of the monotonically increasing file id.
-   */
-  // TODO: Remove.  Implementation detail.
-  long getFilenum();
-
-  /**
-   * @return the number of HLog files
-   */
-  int getNumLogFiles();
-
-  /**
-   * @return the size of HLog files
-   */
-  long getLogFileSize();
-
-  // TODO: Log rolling should not be in this interface.
-  /**
-   * Roll the log writer. That is, start writing log messages to a new file.
-   *
-   * <p>
-   * The implementation is synchronized in order to make sure there's one rollWriter
-   * running at any given time.
-   *
-   * @return If lots of logs, flush the returned regions so next time through we
-   *         can clean logs. Returns null if nothing to flush. Names are actual
-   *         region names as returned by {@link HRegionInfo#getEncodedName()}
-   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
-   * @throws IOException
-   */
-  byte[][] rollWriter() throws FailedLogCloseException, IOException;
-
-  /**
-   * Roll the log writer. That is, start writing log messages to a new file.
-   *
-   * <p>
-   * The implementation is synchronized in order to make sure there's one rollWriter
-   * running at any given time.
-   *
-   * @param force
-   *          If true, force creation of a new writer even if no entries have
-   *          been written to the current writer
-   * @return If lots of logs, flush the returned regions so next time through we
-   *         can clean logs. Returns null if nothing to flush. Names are actual
-   *         region names as returned by {@link HRegionInfo#getEncodedName()}
-   * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
-   * @throws IOException
-   */
-  byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException;
-
-  /**
-   * Shut down the log.
-   *
-   * @throws IOException
-   */
-  void close() throws IOException;
-
-  /**
-   * Shut down the log and delete the log directory.
-   * Used by tests only and in rare cases where we need a log just temporarily while bootstrapping
-   * a region or running migrations.
-   *
-   * @throws IOException
-   */
-  void closeAndDelete() throws IOException;
-
-  /**
-   * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor,
-   *   AtomicLong, boolean, long, long)}
-   * except it causes a sync on the log
-   * @param info
-   * @param tableName
-   * @param edits
-   * @param now
-   * @param htd
-   * @param sequenceId
-   * @throws IOException
-   * @deprecated For tests only and even then, should use
-   * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
-   * List)} and {@link #sync()} instead.
-   */
-  @Deprecated
-  @VisibleForTesting
-  public void append(HRegionInfo info, TableName tableName, WALEdit edits,
-      final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
-
-  /**
-   * For notification post append to the writer.  Used by metrics system at least.
-   * @param entry
-   * @param elapsedTime
-   * @return Size of this append.
-   */
-  long postAppend(final Entry entry, final long elapsedTime);
-
-  /**
-   * For notification post writer sync.  Used by metrics system at least.
-   * @param timeInMillis How long the filesystem sync took in milliseconds.
-   * @param handlerSyncs How many sync handler calls were released by this call to filesystem
-   * sync.
-   */
-  void postSync(final long timeInMillis, final int handlerSyncs);
-
-  /**
-   * Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and
-   * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes BUT on return
-   * this edit must have its region edit/sequence id assigned else it messes up our unification
-   * of mvcc and sequenceid.
-   * @param info
-   * @param tableName
-   * @param edits
-   * @param clusterIds
-   * @param now
-   * @param htd
-   * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
-   * source of its incrementing edits sequence id.  Inside in this call we will increment it and
-   * attach the sequence to the edit we apply the WAL.
-   * @param isInMemstore Always true except for case where we are writing a compaction completion
-   * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
-   * -- it is not an edit for memstore.
-   * @param nonceGroup
-   * @param nonce
-   * @return Returns a 'transaction id'.  Do not use. This is an internal implementation detail and
-   * cannot be respected in all implementations; i.e. the append/sync machine may or may not be
-   * able to sync an explicit edit only (the current default implementation syncs up to the time
-   * of the sync call syncing whatever is behind the sync).
-   * @throws IOException
-   * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)}
-   * instead because you can get back the region edit/sequenceid; it is set into the passed in
-   * <code>key</code>.
-   */
-  @Deprecated
-  long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
-      boolean isInMemstore, long nonceGroup, long nonce) throws IOException;
-
-  /**
-   * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
-   * completes BUT on return this edit must have its region edit/sequence id assigned
-   * else it messes up our unification of mvcc and sequenceid.  On return <code>key</code> will
-   * have the region edit/sequence id filled in.
-   * @param info
-   * @param key Modified by this call; we add to it this edits region edit/sequence id.
-   * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
-   * sequence id that is after all currently appended edits.
-   * @param htd
-   * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
-   * source of its incrementing edits sequence id.  Inside in this call we will increment it and
-   * attach the sequence to the edit we apply the WAL.
-   * @param inMemstore Always true except for case where we are writing a compaction completion
-   * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
-   * -- it is not an edit for memstore.
-   * @param memstoreCells list of Cells added into memstore
-   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
-   * in it.
-   * @throws IOException
-   */
-  long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
-      AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreCells) throws IOException;
-
-  // TODO: Do we need all these versions of sync?
-  void hsync() throws IOException;
-
-  void hflush() throws IOException;
-
-  /**
-   * Sync what we have in the WAL.
-   * @throws IOException
-   */
-  void sync() throws IOException;
-
-  /**
-   * Sync the WAL if the txId was not already sync'd.
-   * @param txid Transaction id to sync to.
-   * @throws IOException
-   */
-  void sync(long txid) throws IOException;
-
-  /**
-   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
-   * in order to be able to do cleanup. This method tells WAL that some region is about
-   * to flush memstore.
-   *
-   * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
-   * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor,
-   * AtomicLong)} as new oldest seqnum.
-   * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
-   * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
-   *
-   * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
-   * closing) and flush couldn't be started.
-   */
-  boolean startCacheFlush(final byte[] encodedRegionName);
-
-  /**
-   * Complete the cache flush.
-   * @param encodedRegionName Encoded region name.
-   */
-  void completeCacheFlush(final byte[] encodedRegionName);
-
-  /**
-   * Abort a cache flush. Call if the flush fails. Note that the only recovery
-   * for an aborted flush currently is a restart of the regionserver so the
-   * snapshot content dropped by the failure gets restored to the memstore.v
-   * @param encodedRegionName Encoded region name.
-   */
-  void abortCacheFlush(byte[] encodedRegionName);
-
-  /**
-   * @return Coprocessor host.
-   */
-  WALCoprocessorHost getCoprocessorHost();
-
-  /**
-   * Get LowReplication-Roller status
-   *
-   * @return lowReplicationRollEnabled
-   */
-  // TODO: This is implementation detail?
-  boolean isLowReplicationRollEnabled();
-
-  /** Gets the earliest sequence number in the memstore for this particular region.
-   * This can serve as best-effort "recent" WAL number for this region.
-   * @param encodedRegionName The region to get the number for.
-   * @return The number if present, HConstants.NO_SEQNUM if absent.
-   */
-  long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
deleted file mode 100644
index a54091d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.io.InterruptedIOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-@InterfaceAudience.Private
-public class HLogFactory {
-    private static final Log LOG = LogFactory.getLog(HLogFactory.class);
-
-    public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
-        final Configuration conf) throws IOException {
-      return new FSHLog(fs, root, logName, conf);
-    }
-    
-    public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
-        final String oldLogName, final Configuration conf) throws IOException {
-      return new FSHLog(fs, root, logName, oldLogName, conf, null, true, null, false);
-}
-    
-    public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
-        final Configuration conf, final List<WALActionsListener> listeners,
-        final String prefix) throws IOException {
-      return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
-        true, prefix, false);
-    }
-
-    public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
-        final Configuration conf, final List<WALActionsListener> listeners,
-        final String prefix) throws IOException {
-      return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
-        false, prefix, true);
-    }
-
-    /*
-     * WAL Reader
-     */
-    private static Class<? extends Reader> logReaderClass;
-
-    static void resetLogReaderClass() {
-      logReaderClass = null;
-    }
-
-    public static HLog.Reader createReader(final FileSystem fs,
-        final Path path, Configuration conf) throws IOException {
-      return createReader(fs, path, conf, null);
-    }
-
-    /**
-     * Create a reader for the WAL. If you are reading from a file that's being written to
-     * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
-     * then just seek back to the last known good position.
-     * @return A WAL reader.  Close when done with it.
-     * @throws IOException
-     */
-    public static HLog.Reader createReader(final FileSystem fs, final Path path,
-        Configuration conf, CancelableProgressable reporter) throws IOException {
-      return createReader(fs, path, conf, reporter, true);
-    }
-
-    public static HLog.Reader createReader(final FileSystem fs, final Path path,
-      Configuration conf, CancelableProgressable reporter, boolean allowCustom)
-        throws IOException {
-      if (allowCustom && (logReaderClass == null)) {
-        logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
-          ProtobufLogReader.class, Reader.class);
-      }
-      Class<? extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
-
-      try {
-        // A hlog file could be under recovery, so it may take several
-        // tries to get it open. Instead of claiming it is corrupted, retry
-        // to open it up to 5 minutes by default.
-        long startWaiting = EnvironmentEdgeManager.currentTime();
-        long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
-        int nbAttempt = 0;
-        while (true) {
-          try {
-            if (lrClass != ProtobufLogReader.class) {
-              // User is overriding the WAL reader, let them.
-              HLog.Reader reader = lrClass.newInstance();
-              reader.init(fs, path, conf, null);
-              return reader;
-            } else {
-              FSDataInputStream stream = fs.open(path);
-              // Note that zero-length file will fail to read PB magic, and attempt to create
-              // a non-PB reader and fail the same way existing code expects it to. If we get
-              // rid of the old reader entirely, we need to handle 0-size files differently from
-              // merely non-PB files.
-              byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
-              boolean isPbWal = (stream.read(magic) == magic.length)
-                  && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
-              HLog.Reader reader =
-                  isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
-              reader.init(fs, path, conf, stream);
-              return reader;
-            }
-          } catch (IOException e) {
-            String msg = e.getMessage();
-            if (msg != null && (msg.contains("Cannot obtain block length")
-                || msg.contains("Could not obtain the last block")
-                || msg.matches("Blocklist for [^ ]* has changed.*"))) {
-              if (++nbAttempt == 1) {
-                LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
-              }
-              if (reporter != null && !reporter.progress()) {
-                throw new InterruptedIOException("Operation is cancelled");
-              }
-              if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
-                LOG.error("Can't open after " + nbAttempt + " attempts and "
-                  + (EnvironmentEdgeManager.currentTime() - startWaiting)
-                  + "ms " + " for " + path);
-              } else {
-                try {
-                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
-                  continue; // retry
-                } catch (InterruptedException ie) {
-                  InterruptedIOException iioe = new InterruptedIOException();
-                  iioe.initCause(ie);
-                  throw iioe;
-                }
-              }
-            }
-            throw e;
-          }
-        }
-      } catch (IOException ie) {
-        throw ie;
-      } catch (Exception e) {
-        throw new IOException("Cannot get log reader", e);
-      }
-    }
-
-    /*
-     * WAL writer
-     */
-    private static Class<? extends Writer> logWriterClass;
-
-    static void resetLogWriterClass() {
-      logWriterClass = null;
-    }
-
-    /**
-     * Create a writer for the WAL.
-     * @return A WAL writer.  Close when done with it.
-     * @throws IOException
-     */
-    public static HLog.Writer createWALWriter(final FileSystem fs,
-        final Path path, Configuration conf) throws IOException {
-      return createWriter(fs, path, conf, false);
-    }
-
-    public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
-        final Path path, Configuration conf) throws IOException {
-      return createWriter(fs, path, conf, true);
-    }
-
-    private static HLog.Writer createWriter(final FileSystem fs,
-        final Path path, Configuration conf, boolean overwritable)
-    throws IOException {
-      try {
-        if (logWriterClass == null) {
-          logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
-              ProtobufLogWriter.class, Writer.class);
-        }
-        HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
-        writer.init(fs, path, conf, overwritable);
-        return writer;
-      } catch (Exception e) {
-        throw new IOException("cannot get log writer", e);
-      }
-    }
-}


Mime
View raw message