hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [14/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.
Date Tue, 02 Dec 2014 17:20:53 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
deleted file mode 100644
index d2bbd27..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-import com.google.protobuf.TextFormat;
-
-@InterfaceAudience.Private
-public class HLogUtil {
-  static final Log LOG = LogFactory.getLog(HLogUtil.class);
-
-  /**
-   * Pattern used to validate a HLog file name
-   */
-  private static final Pattern pattern =
-      Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
-
-  /**
-   * @param filename
-   *          name of the file to validate
-   * @return <tt>true</tt> if the filename matches an HLog, <tt>false</tt>
-   *         otherwise
-   */
-  public static boolean validateHLogFilename(String filename) {
-    return pattern.matcher(filename).matches();
-  }
-
-  /**
-   * Construct the HLog directory name
-   *
-   * @param serverName
-   *          Server name formatted as described in {@link ServerName}
-   * @return the relative HLog directory name, e.g.
-   *         <code>.logs/1.example.org,60030,12345</code> if
-   *         <code>serverName</code> passed is
-   *         <code>1.example.org,60030,12345</code>
-   */
-  public static String getHLogDirectoryName(final String serverName) {
-    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
-    dirName.append("/");
-    dirName.append(serverName);
-    return dirName.toString();
-  }
-
-  /**
-   * @param regiondir
-   *          This regions directory in the filesystem.
-   * @return The directory that holds recovered edits files for the region
-   *         <code>regiondir</code>
-   */
-  public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
-    return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
-  }
-  
-  /**
-   * Move aside a bad edits file.
-   *
-   * @param fs
-   * @param edits
-   *          Edits file to move aside.
-   * @return The name of the moved aside file.
-   * @throws IOException
-   */
-  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
-      throws IOException {
-    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
-        + System.currentTimeMillis());
-    if (!fs.rename(edits, moveAsideName)) {
-      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
-    }
-    return moveAsideName;
-  }
-
-  /**
-   * @param path
-   *          - the path to analyze. Expected format, if it's in hlog directory:
-   *          / [base directory for hbase] / hbase / .logs / ServerName /
-   *          logfile
-   * @return null if it's not a log file. Returns the ServerName of the region
-   *         server that created this log file otherwise.
-   */
-  public static ServerName getServerNameFromHLogDirectoryName(
-      Configuration conf, String path) throws IOException {
-    if (path == null
-        || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
-      return null;
-    }
-
-    if (conf == null) {
-      throw new IllegalArgumentException("parameter conf must be set");
-    }
-
-    final String rootDir = conf.get(HConstants.HBASE_DIR);
-    if (rootDir == null || rootDir.isEmpty()) {
-      throw new IllegalArgumentException(HConstants.HBASE_DIR
-          + " key not found in conf.");
-    }
-
-    final StringBuilder startPathSB = new StringBuilder(rootDir);
-    if (!rootDir.endsWith("/"))
-      startPathSB.append('/');
-    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
-    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
-      startPathSB.append('/');
-    final String startPath = startPathSB.toString();
-
-    String fullPath;
-    try {
-      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
-    } catch (IllegalArgumentException e) {
-      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
-      return null;
-    }
-
-    if (!fullPath.startsWith(startPath)) {
-      return null;
-    }
-
-    final String serverNameAndFile = fullPath.substring(startPath.length());
-
-    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
-      // Either it's a file (not a directory) or it's not a ServerName format
-      return null;
-    }
-
-    Path p = new Path(path);
-    return getServerNameFromHLogDirectoryName(p);
-  }
-
-  /**
-   * This function returns region server name from a log file name which is in either format:
-   * hdfs://<name node>/hbase/.logs/<server name>-splitting/... or hdfs://<name
-   * node>/hbase/.logs/<server name>/...
-   * @param logFile
-   * @return null if the passed in logFile isn't a valid HLog file path
-   */
-  public static ServerName getServerNameFromHLogDirectoryName(Path logFile) {
-    Path logDir = logFile.getParent();
-    String logDirName = logDir.getName();
-    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
-      logDir = logFile;
-      logDirName = logDir.getName();
-    }
-    ServerName serverName = null;
-    if (logDirName.endsWith(HLog.SPLITTING_EXT)) {
-      logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length());
-    }
-    try {
-      serverName = ServerName.parseServerName(logDirName);
-    } catch (IllegalArgumentException ex) {
-      serverName = null;
-      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
-    }
-    if (serverName != null && serverName.getStartcode() < 0) {
-      LOG.warn("Invalid log file path=" + logFile);
-      return null;
-    }
-    return serverName;
-  }
-
-  /**
-   * Returns sorted set of edit files made by wal-log splitter, excluding files
-   * with '.temp' suffix.
-   *
-   * @param fs
-   * @param regiondir
-   * @return Files in passed <code>regiondir</code> as a sorted set.
-   * @throws IOException
-   */
-  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
-      final Path regiondir) throws IOException {
-    NavigableSet<Path> filesSorted = new TreeSet<Path>();
-    Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
-    if (!fs.exists(editsdir))
-      return filesSorted;
-    FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
-      @Override
-      public boolean accept(Path p) {
-        boolean result = false;
-        try {
-          // Return files and only files that match the editfile names pattern.
-          // There can be other files in this directory other than edit files.
-          // In particular, on error, we'll move aside the bad edit file giving
-          // it a timestamp suffix. See moveAsideBadEditsFile.
-          Matcher m = HLog.EDITFILES_NAME_PATTERN.matcher(p.getName());
-          result = fs.isFile(p) && m.matches();
-          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
-          // because it means splithlog thread is writting this file.
-          if (p.getName().endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
-            result = false;
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed isFile check on " + p);
-        }
-        return result;
-      }
-    });
-    if (files == null)
-      return filesSorted;
-    for (FileStatus status : files) {
-      filesSorted.add(status.getPath());
-    }
-    return filesSorted;
-  }
-
-  public static boolean isMetaFile(Path p) {
-    return isMetaFile(p.getName());
-  }
-
-  public static boolean isMetaFile(String p) {
-    if (p != null && p.endsWith(HLog.META_HLOG_FILE_EXTN)) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Write the marker that a compaction has succeeded and is about to be committed.
-   * This provides info to the HMaster to allow it to recover the compaction if
-   * this regionserver dies in the middle (This part is not yet implemented). It also prevents
-   * the compaction from finishing if this regionserver has already lost its lease on the log.
-   * @param sequenceId Used by HLog to get sequence Id for the waledit.
-   */
-  public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
-      final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
-    TableName tn = TableName.valueOf(c.getTableName().toByteArray());
-    HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
-    log.sync();
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
-    }
-  }
-
-  /**
-   * Write a flush marker indicating a start / abort or a complete of a region flush
-   */
-  public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
-      final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
-    TableName tn = TableName.valueOf(f.getTableName().toByteArray());
-    HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null);
-    if (sync) log.sync(trx);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
-    }
-    return trx;
-  }
-
-  /**
-   * Write a region open marker indicating that the region is opened
-   */
-  public static long writeRegionEventMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
-      final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
-    TableName tn = TableName.valueOf(r.getTableName().toByteArray());
-    HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    long trx = log.appendNoSync(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
-      sequenceId, false, null);
-    log.sync(trx);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
-    }
-    return trx;
-  }
-  
-  /**
-   * Create a file with name as region open sequence id
-   * 
-   * @param fs
-   * @param regiondir
-   * @param newSeqId
-   * @param saftyBumper
-   * @return long new sequence Id value
-   * @throws IOException
-   */
-  public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir,
-      long newSeqId, long saftyBumper) throws IOException {
-
-    Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
-    long maxSeqId = 0;
-    FileStatus[] files = null;
-    if (fs.exists(editsdir)) {
-      files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
-        @Override
-        public boolean accept(Path p) {
-          if (p.getName().endsWith(HLog.SEQUENCE_ID_FILE_SUFFIX)) {
-            return true;
-          }
-          return false;
-        }
-      });
-      if (files != null) {
-        for (FileStatus status : files) {
-          String fileName = status.getPath().getName();
-          try {
-            Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
-                    - HLog.SEQUENCE_ID_FILE_SUFFIX.length()));
-            maxSeqId = Math.max(tmpSeqId, maxSeqId);
-          } catch (NumberFormatException ex) {
-            LOG.warn("Invalid SeqId File Name=" + fileName);
-          }
-        }
-      }
-    }
-    if (maxSeqId > newSeqId) {
-      newSeqId = maxSeqId;
-    }
-    newSeqId += saftyBumper; // bump up SeqId
-    
-    // write a new seqId file
-    Path newSeqIdFile = new Path(editsdir, newSeqId + HLog.SEQUENCE_ID_FILE_SUFFIX);
-    if (!fs.createNewFile(newSeqIdFile)) {
-      throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
-    }
-    // remove old ones
-    if(files != null) {
-      for (FileStatus status : files) {
-        fs.delete(status.getPath(), false);
-      }
-    }
-    return newSeqId;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index cf4b7a6..ad549f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.util.StringUtils;
  * single function call and turn it into multiple manipulations of the hadoop metrics system.
  */
 @InterfaceAudience.Private
-public class MetricsWAL {
+public class MetricsWAL extends WALActionsListener.Base {
   static final Log LOG = LogFactory.getLog(MetricsWAL.class);
 
   private final MetricsWALSource source;
@@ -40,19 +40,20 @@ public class MetricsWAL {
     source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
   }
 
-  public void finishSync(long time) {
-    source.incrementSyncTime(time);
+  @Override
+  public void postSync(final long timeInNanos, final int handlerSyncs) {
+    source.incrementSyncTime(timeInNanos/1000000l);
   }
 
-  public void finishAppend(long time, long size) {
-
+  @Override
+  public void postAppend(final long size, final long time) {
     source.incrementAppendCount();
     source.incrementAppendTime(time);
     source.incrementAppendSize(size);
 
     if (time > 1000) {
       source.incrementSlowAppendCount();
-      LOG.warn(String.format("%s took %d ms appending an edit to hlog; len~=%s",
+      LOG.warn(String.format("%s took %d ms appending an edit to wal; len~=%s",
           Thread.currentThread().getName(),
           time,
           StringUtils.humanReadableInt(size)));

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 39f1d9f..285f69b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -31,6 +31,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -58,17 +61,32 @@ import com.google.protobuf.InvalidProtocolBufferException;
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
 public class ProtobufLogReader extends ReaderBase {
   private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
-  static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
-  static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+  // public for WALFactory until we move everything to o.a.h.h.wal
+  @InterfaceAudience.Private
+  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
+  // public for TestWALSplit
+  @InterfaceAudience.Private
+  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+  /**
+   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
+   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
+   */
+  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
+  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
+
   protected FSDataInputStream inputStream;
   protected Codec.Decoder cellDecoder;
   protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
   protected boolean hasCompression = false;
   protected boolean hasTagCompression = false;
   // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
-  // in the hlog, the inputstream's position is equal to walEditsStopOffset.
+  // in the wal, the inputstream's position is equal to walEditsStopOffset.
   private long walEditsStopOffset;
   private boolean trailerPresent;
+  protected WALTrailer trailer;
+  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+  // than this size, it is written/read respectively, with a WARN message in the log.
+  protected int trailerWarnSize;
   private static List<String> writerClsNames = new ArrayList<String>();
   static {
     writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
@@ -121,6 +139,13 @@ public class ProtobufLogReader extends ReaderBase {
   }
 
   @Override
+  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
+      throws IOException {
+    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
+    super.init(fs, path, conf, stream);
+  }
+
+  @Override
   protected String initReader(FSDataInputStream stream) throws IOException {
     return initInternal(stream, true);
   }
@@ -268,7 +293,7 @@ public class ProtobufLogReader extends ReaderBase {
   }
 
   @Override
-  protected boolean readNext(HLog.Entry entry) throws IOException {
+  protected boolean readNext(Entry entry) throws IOException {
     while (true) {
       // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
       long originalPosition = this.inputStream.getPos();
@@ -332,7 +357,7 @@ public class ProtobufLogReader extends ReaderBase {
               initCause(realEofEx != null ? realEofEx : ex);
         }
         if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
-          LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+          LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
               + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
               + this.walEditsStopOffset);
           throw new EOFException("Read WALTrailer while reading WALEdits");
@@ -370,11 +395,6 @@ public class ProtobufLogReader extends ReaderBase {
   }
 
   @Override
-  public WALTrailer getWALTrailer() {
-    return trailer;
-  }
-
-  @Override
   protected void seekOnFs(long pos) throws IOException {
     this.inputStream.seek(pos);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index fe2eac9..ca80e4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
 
 /**
  * Writer for protobuf-based WAL.
@@ -77,8 +81,7 @@ public class ProtobufLogWriter extends WriterBase {
     super.init(fs, path, conf, overwritable);
     assert this.output == null;
     boolean doCompress = initializeCompressionContext(conf, path);
-    this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
-      HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
+    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
     int bufferSize = FSUtils.getDefaultBufferSize(fs);
     short replication = (short)conf.getInt(
         "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
@@ -110,7 +113,7 @@ public class ProtobufLogWriter extends WriterBase {
   }
 
   @Override
-  public void append(HLog.Entry entry) throws IOException {
+  public void append(Entry entry) throws IOException {
     entry.setCompressionContext(compressionContext);
     entry.getKey().getBuilder(compressor).
       setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
@@ -134,7 +137,7 @@ public class ProtobufLogWriter extends WriterBase {
     }
   }
 
-  protected WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
+  WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
     return builder.build();
   }
 
@@ -188,8 +191,7 @@ public class ProtobufLogWriter extends WriterBase {
     return this.output;
   }
 
-  @Override
-  public void setWALTrailer(WALTrailer walTrailer) {
+  void setWALTrailer(WALTrailer walTrailer) {
     this.trailer = walTrailer;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 7fe5a81..5f1e904 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -31,21 +31,19 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.hbase.util.FSUtils;
 
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-public abstract class ReaderBase implements HLog.Reader {
+public abstract class ReaderBase implements DefaultWALProvider.Reader {
   private static final Log LOG = LogFactory.getLog(ReaderBase.class);
   protected Configuration conf;
   protected FileSystem fs;
   protected Path path;
   protected long edit = 0;
   protected long fileLength;
-  protected WALTrailer trailer;
-  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
-  // than this size, it is written/read respectively, with a WARN message in the log.
-  protected int trailerWarnSize;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
@@ -65,8 +63,6 @@ public abstract class ReaderBase implements HLog.Reader {
     this.path = path;
     this.fs = fs;
     this.fileLength = this.fs.getFileStatus(path).getLen();
-    this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
-      HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
     String cellCodecClsName = initReader(stream);
 
     boolean compression = hasCompression();
@@ -87,15 +83,17 @@ public abstract class ReaderBase implements HLog.Reader {
   }
 
   @Override
-  public HLog.Entry next() throws IOException {
+  public Entry next() throws IOException {
     return next(null);
   }
 
   @Override
-  public HLog.Entry next(HLog.Entry reuse) throws IOException {
-    HLog.Entry e = reuse;
+  public Entry next(Entry reuse) throws IOException {
+    Entry e = reuse;
     if (e == null) {
-      e = new HLog.Entry(new HLogKey(), new WALEdit());
+      // we use HLogKey here instead of WALKey directly to support legacy coprocessors,
+      // seqencefile based readers, and HLogInputFormat.
+      e = new Entry(new HLogKey(), new WALEdit());
     }
     if (compressionContext != null) {
       e.setCompressionContext(compressionContext);
@@ -165,15 +163,10 @@ public abstract class ReaderBase implements HLog.Reader {
    * @param e The entry to read into.
    * @return Whether there was anything to read.
    */
-  protected abstract boolean readNext(HLog.Entry e) throws IOException;
+  protected abstract boolean readNext(Entry e) throws IOException;
 
   /**
    * Performs a filesystem-level seek to a certain position in an underlying file.
    */
   protected abstract void seekOnFs(long pos) throws IOException;
-
-  @Override
-  public WALTrailer getWALTrailer() {
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
index 985c0bb..03d1608 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java
@@ -52,10 +52,10 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter {
     builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName());
     if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
       // Get an instance of our cipher
-      Cipher cipher = Encryption.getCipher(conf,
-        conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER));
+      final String cipherName = conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, DEFAULT_CIPHER);
+      Cipher cipher = Encryption.getCipher(conf, cipherName);
       if (cipher == null) {
-        throw new RuntimeException("Cipher '" + cipher + "' is not available");
+        throw new RuntimeException("Cipher '" + cipherName + "' is not available");
       }
 
       // Generate an encryption key for this WAL

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index 128274a..11312b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.Text;
@@ -222,10 +222,27 @@ public class SequenceFileLogReader extends ReaderBase {
   }
 
 
+  /**
+   * fill in the passed entry with teh next key/value.
+   * Note that because this format deals with our legacy storage, the provided
+   * Entery MUST use an {@link HLogKey} for the key.
+   * @return boolean indicating if the contents of Entry have been filled in.
+   */
   @Override
   protected boolean readNext(Entry e) throws IOException {
     try {
-      boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
+      if (!(e.getKey() instanceof HLogKey)) {
+        final IllegalArgumentException exception = new IllegalArgumentException(
+            "SequenceFileLogReader only works when given entries that have HLogKey for keys. This" +
+            " one had '" + e.getKey().getClass() + "'");
+        LOG.error("We need to use the legacy SequenceFileLogReader to handle a " +
+            " pre-0.96 style WAL, but HBase internals failed to use the deprecated HLogKey class." +
+            " This is a bug; please file an issue or email the developer mailing list. You will " +
+            "need the following exception details when seeking help from the HBase community.",
+            exception);
+        throw exception;
+      }
+      boolean hasNext = this.reader.next((HLogKey)e.getKey(), e.getEdit());
       if (!hasNext) return false;
       // Scopes are probably in WAL edit, move to key
       NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index eddb92d..2194ce9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 
+import org.apache.hadoop.hbase.wal.WALKey;
+
 /**
- * Get notification of {@link FSHLog}/WAL log events. The invocations are inline
+ * Get notification of WAL events. The invocations are inline
  * so make sure your implementation is fast else you'll slow hbase.
  */
 @InterfaceAudience.Private
@@ -35,30 +37,30 @@ public interface WALActionsListener {
   /**
    * The WAL is going to be rolled. The oldPath can be null if this is
    * the first log file from the regionserver.
-   * @param oldPath the path to the old hlog
-   * @param newPath the path to the new hlog
+   * @param oldPath the path to the old wal
+   * @param newPath the path to the new wal
    */
   void preLogRoll(Path oldPath, Path newPath) throws IOException;
 
   /**
    * The WAL has been rolled. The oldPath can be null if this is
    * the first log file from the regionserver.
-   * @param oldPath the path to the old hlog
-   * @param newPath the path to the new hlog
+   * @param oldPath the path to the old wal
+   * @param newPath the path to the new wal
    */
   void postLogRoll(Path oldPath, Path newPath) throws IOException;
 
   /**
    * The WAL is going to be archived.
-   * @param oldPath the path to the old hlog
-   * @param newPath the path to the new hlog
+   * @param oldPath the path to the old wal
+   * @param newPath the path to the new wal
    */
   void preLogArchive(Path oldPath, Path newPath) throws IOException;
 
   /**
    * The WAL has been archived.
-   * @param oldPath the path to the old hlog
-   * @param newPath the path to the new hlog
+   * @param oldPath the path to the old wal
+   * @param newPath the path to the new wal
    */
   void postLogArchive(Path oldPath, Path newPath) throws IOException;
 
@@ -79,7 +81,7 @@ public interface WALActionsListener {
   * @param logEdit
   */
   void visitLogEntryBeforeWrite(
-    HRegionInfo info, HLogKey logKey, WALEdit logEdit
+    HRegionInfo info, WALKey logKey, WALEdit logEdit
   );
 
   /**
@@ -87,11 +89,59 @@ public interface WALActionsListener {
    * @param htd
    * @param logKey
    * @param logEdit
-   * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, HLogKey, WALEdit)}
-   * It only exists to get scope when replicating.  Scope should be in the HLogKey and not need
+   * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
+   * It only exists to get scope when replicating.  Scope should be in the WALKey and not need
    * us passing in a <code>htd</code>.
    */
   void visitLogEntryBeforeWrite(
-    HTableDescriptor htd, HLogKey logKey, WALEdit logEdit
+    HTableDescriptor htd, WALKey logKey, WALEdit logEdit
   );
+
+  /**
+   * For notification post append to the writer.  Used by metrics system at least.
+   * TODO: Combine this with above.
+   * @param entryLen approx length of cells in this append.
+   * @param elapsedTimeMillis elapsed time in milliseconds.
+   */
+  void postAppend(final long entryLen, final long elapsedTimeMillis);
+
+  /**
+   * For notification post writer sync.  Used by metrics system at least.
+   * @param timeInNanos How long the filesystem sync took in nanoseconds.
+   * @param handlerSyncs How many sync handler calls were released by this call to filesystem
+   * sync.
+   */
+  void postSync(final long timeInNanos, final int handlerSyncs);
+
+  static class Base implements WALActionsListener {
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {}
+
+    @Override
+    public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
+
+    @Override
+    public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
+
+    @Override
+    public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
+
+    @Override
+    public void logRollRequested() {}
+
+    @Override
+    public void logCloseRequested() {}
+
+    @Override
+    public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
+
+    @Override
+    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+
+    @Override
+    public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
+
+    @Override
+    public void postSync(final long timeInNanos, final int handlerSyncs) {}
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index f3927f9..433e5c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -130,7 +130,7 @@ public class WALCellCodec implements Codec {
     byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
   }
 
-  // TODO: it sucks that compression context is in HLog.Entry. It'd be nice if it was here.
+  // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here.
   //       Dictionary could be gotten by enum; initially, based on enum, context would create
   //       an array of dictionaries.
   static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
index 521e5f3..52dcee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java
@@ -28,9 +28,12 @@ import org.apache.hadoop.hbase.coprocessor.*;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+
 /**
  * Implements the coprocessor environment and runtime support for coprocessors
- * loaded within a {@link FSHLog}.
+ * loaded within a {@link WAL}.
  */
 @InterfaceAudience.Private
 public class WALCoprocessorHost
@@ -42,10 +45,13 @@ public class WALCoprocessorHost
   static class WALEnvironment extends CoprocessorHost.Environment
     implements WALCoprocessorEnvironment {
 
-    private FSHLog wal;
+    private final WAL wal;
+
+    final boolean useLegacyPre;
+    final boolean useLegacyPost;
 
     @Override
-    public FSHLog getWAL() {
+    public WAL getWAL() {
       return wal;
     }
 
@@ -56,23 +62,32 @@ public class WALCoprocessorHost
      * @param priority chaining priority
      * @param seq load sequence
      * @param conf configuration
-     * @param hlog HLog
+     * @param wal WAL
      */
     public WALEnvironment(Class<?> implClass, final Coprocessor impl,
         final int priority, final int seq, final Configuration conf,
-        final FSHLog hlog) {
+        final WAL wal) {
       super(impl, priority, seq, conf);
-      this.wal = hlog;
+      this.wal = wal;
+      // Pick which version of the API we'll call.
+      // This way we avoid calling the new version on older WALObservers so
+      // we can maintain binary compatibility.
+      // See notes in javadoc for WALObserver
+      useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class,
+          HRegionInfo.class, WALKey.class, WALEdit.class);
+      useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class,
+          HRegionInfo.class, WALKey.class, WALEdit.class);
     }
   }
 
-  FSHLog wal;
+  private final WAL wal;
+
   /**
    * Constructor
    * @param log the write ahead log
    * @param conf the configuration
    */
-  public WALCoprocessorHost(final FSHLog log, final Configuration conf) {
+  public WALCoprocessorHost(final WAL log, final Configuration conf) {
     // We don't want to require an Abortable passed down through (FS)HLog, so
     // this means that a failure to load of a WAL coprocessor won't abort the
     // server. This isn't ideal, and means that security components that
@@ -100,21 +115,29 @@ public class WALCoprocessorHost
    * @return true if default behavior should be bypassed, false otherwise
    * @throws IOException
    */
-  public boolean preWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+  public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
     boolean bypass = false;
     if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
     ObserverContext<WALCoprocessorEnvironment> ctx = null;
     for (WALEnvironment env: coprocessors) {
-      if (env.getInstance() instanceof
-          org.apache.hadoop.hbase.coprocessor.WALObserver) {
+      if (env.getInstance() instanceof WALObserver) {
+        final WALObserver observer = (WALObserver)env.getInstance();
         ctx = ObserverContext.createAndPrepare(env, ctx);
         Thread currentThread = Thread.currentThread();
         ClassLoader cl = currentThread.getContextClassLoader();
         try {
           currentThread.setContextClassLoader(env.getClassLoader());
-          ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
-            preWALWrite(ctx, info, logKey, logEdit);
+          if (env.useLegacyPre) {
+            if (logKey instanceof HLogKey) {
+              observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit);
+            } else {
+              legacyWarning(observer.getClass(),
+                  "There are wal keys present that are not HLogKey.");
+            }
+          } else {
+            observer.preWALWrite(ctx, info, logKey, logEdit);
+          }
         } catch (Throwable e) {
           handleCoprocessorThrowable(env, e);
         } finally {
@@ -135,20 +158,28 @@ public class WALCoprocessorHost
    * @param logEdit
    * @throws IOException
    */
-  public void postWALWrite(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
+  public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
       throws IOException {
     if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
     ObserverContext<WALCoprocessorEnvironment> ctx = null;
     for (WALEnvironment env: coprocessors) {
-      if (env.getInstance() instanceof
-          org.apache.hadoop.hbase.coprocessor.WALObserver) {
+      if (env.getInstance() instanceof WALObserver) {
+        final WALObserver observer = (WALObserver)env.getInstance();
         ctx = ObserverContext.createAndPrepare(env, ctx);
         Thread currentThread = Thread.currentThread();
         ClassLoader cl = currentThread.getContextClassLoader();
         try {
           currentThread.setContextClassLoader(env.getClassLoader());
-          ((org.apache.hadoop.hbase.coprocessor.WALObserver)env.getInstance()).
-            postWALWrite(ctx, info, logKey, logEdit);
+          if (env.useLegacyPost) {
+            if (logKey instanceof HLogKey) {
+              observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit);
+            } else {
+              legacyWarning(observer.getClass(),
+                  "There are wal keys present that are not HLogKey.");
+            }
+          } else {
+            observer.postWALWrite(ctx, info, logKey, logEdit);
+          }
         } catch (Throwable e) {
           handleCoprocessorThrowable(env, e);
         } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 172e478..05cead2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.io.Writable;
  * for serializing/deserializing a set of KeyValue items.
  *
  * Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R,
- * the HLog would have three log entries as follows:
+ * the WAL would have three log entries as follows:
  *
  *    <logseq1-for-edit1>:<KeyValue-for-edit-c1>
  *    <logseq2-for-edit2>:<KeyValue-for-edit-c2>
@@ -73,7 +73,7 @@ import org.apache.hadoop.io.Writable;
  *   <-1, 3, <Keyvalue-for-edit-c1>, <KeyValue-for-edit-c2>, <KeyValue-for-edit-c3>>
  *
  * The -1 marker is just a special way of being backward compatible with
- * an old HLog which would have contained a single <KeyValue>.
+ * an old WAL which would have contained a single <KeyValue>.
  *
  * The deserializer for WALEdit backward compatibly detects if the record
  * is an old style KeyValue or the new style WALEdit.
@@ -168,7 +168,7 @@ public class WALEdit implements Writable, HeapSize {
     int versionOrLength = in.readInt();
     // TODO: Change version when we protobuf.  Also, change way we serialize KV!  Pb it too.
     if (versionOrLength == VERSION_2) {
-      // this is new style HLog entry containing multiple KeyValues.
+      // this is new style WAL entry containing multiple KeyValues.
       int numEdits = in.readInt();
       for (int idx = 0; idx < numEdits; idx++) {
         if (compressionContext != null) {
@@ -189,7 +189,7 @@ public class WALEdit implements Writable, HeapSize {
         }
       }
     } else {
-      // this is an old style HLog entry. The int that we just
+      // this is an old style WAL entry. The int that we just
       // read is actually the length of a single KeyValue
       this.add(KeyValue.create(versionOrLength, in));
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 9c0b8a9..ff5f2f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 import com.google.protobuf.ServiceException;
 
@@ -96,17 +97,17 @@ public class WALEditsReplaySink {
    * @param entries
    * @throws IOException
    */
-  public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException {
+  public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException {
     if (entries.size() == 0) {
       return;
     }
 
     int batchSize = entries.size();
-    Map<HRegionInfo, List<HLog.Entry>> entriesByRegion =
-        new HashMap<HRegionInfo, List<HLog.Entry>>();
+    Map<HRegionInfo, List<Entry>> entriesByRegion =
+        new HashMap<HRegionInfo, List<Entry>>();
     HRegionLocation loc = null;
-    HLog.Entry entry = null;
-    List<HLog.Entry> regionEntries = null;
+    Entry entry = null;
+    List<Entry> regionEntries = null;
     // Build the action list.
     for (int i = 0; i < batchSize; i++) {
       loc = entries.get(i).getFirst();
@@ -114,7 +115,7 @@ public class WALEditsReplaySink {
       if (entriesByRegion.containsKey(loc.getRegionInfo())) {
         regionEntries = entriesByRegion.get(loc.getRegionInfo());
       } else {
-        regionEntries = new ArrayList<HLog.Entry>();
+        regionEntries = new ArrayList<Entry>();
         entriesByRegion.put(loc.getRegionInfo(), regionEntries);
       }
       regionEntries.add(entry);
@@ -123,9 +124,9 @@ public class WALEditsReplaySink {
     long startTime = EnvironmentEdgeManager.currentTime();
 
     // replaying edits by region
-    for (Map.Entry<HRegionInfo, List<HLog.Entry>> _entry : entriesByRegion.entrySet()) {
+    for (Map.Entry<HRegionInfo, List<Entry>> _entry : entriesByRegion.entrySet()) {
       HRegionInfo curRegion = _entry.getKey();
-      List<HLog.Entry> allActions = _entry.getValue();
+      List<Entry> allActions = _entry.getValue();
       // send edits in chunks
       int totalActions = allActions.size();
       int replayedActions = 0;
@@ -159,7 +160,7 @@ public class WALEditsReplaySink {
   }
 
   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-      final List<HLog.Entry> entries) throws IOException {
+      final List<Entry> entries) throws IOException {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
@@ -182,11 +183,11 @@ public class WALEditsReplaySink {
    */
   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
     private HRegionInfo regionInfo;
-    private List<HLog.Entry> entries;
+    private List<Entry> entries;
 
     ReplayServerCallable(final HConnection connection, final TableName tableName,
         final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-        final List<HLog.Entry> entries) {
+        final List<Entry> entries) {
       super(connection, tableName, null);
       this.entries = entries;
       this.regionInfo = regionInfo;
@@ -203,11 +204,11 @@ public class WALEditsReplaySink {
       return null;
     }
 
-    private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries)
+    private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
         throws IOException, ServiceException {
       if (entries.isEmpty()) return;
 
-      HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
+      Entry[] entriesArray = new Entry[entries.size()];
       entriesArray = entries.toArray(entriesArray);
       AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
 
@@ -228,11 +229,11 @@ public class WALEditsReplaySink {
       // if not due to connection issue, the following code should run fast because it uses
       // cached location
       boolean skip = false;
-      for (HLog.Entry entry : this.entries) {
+      for (Entry entry : this.entries) {
         WALEdit edit = entry.getEdit();
         List<Cell> cells = edit.getCells();
         for (Cell cell : cells) {
-          // filtering HLog meta entries
+          // filtering WAL meta entries
           setLocation(conn.locateRegion(tableName, cell.getRow()));
           skip = true;
           break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
new file mode 100644
index 0000000..5f00643
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Helper methods to ease Region Server integration with the write ahead log.
+ * Note that methods in this class specifically should not require access to anything
+ * other than the API found in {@link WAL}.
+ */
+@InterfaceAudience.Private
+public class WALUtil {
+  static final Log LOG = LogFactory.getLog(WALUtil.class);
+
+  /**
+   * Write the marker that a compaction has succeeded and is about to be committed.
+   * This provides info to the HMaster to allow it to recover the compaction if
+   * this regionserver dies in the middle (This part is not yet implemented). It also prevents
+   * the compaction from finishing if this regionserver has already lost its lease on the log.
+   * @param sequenceId Used by WAL to get sequence Id for the waledit.
+   */
+  public static void writeCompactionMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+      final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
+    TableName tn = TableName.valueOf(c.getTableName().toByteArray());
+    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+    log.append(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
+    log.sync();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
+    }
+  }
+
+  /**
+   * Write a flush marker indicating a start / abort or a complete of a region flush
+   */
+  public static long writeFlushMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+      final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
+    TableName tn = TableName.valueOf(f.getTableName().toByteArray());
+    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+    long trx = log.append(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false,
+        null);
+    if (sync) log.sync(trx);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
+    }
+    return trx;
+  }
+
+  /**
+   * Write a region open marker indicating that the region is opened
+   */
+  public static long writeRegionEventMarker(WAL log, HTableDescriptor htd, HRegionInfo info,
+      final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
+    TableName tn = TableName.valueOf(r.getTableName().toByteArray());
+    // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+    WALKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
+    long trx = log.append(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
+      sequenceId, false, null);
+    log.sync(trx);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
+    }
+    return trx;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
index cd3aeaf..8188e02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.util.FSUtils;
 
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+
 /**
  * Context used by our wal dictionary compressor. Null if we're not to do our
  * custom dictionary compression.
  */
 @InterfaceAudience.Private
-public abstract class WriterBase implements HLog.Writer {
+public abstract class WriterBase implements DefaultWALProvider.Writer {
 
   protected CompressionContext compressionContext;
   protected Configuration conf;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 281ba63..6a3981a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * A {@link WALEntryFilter} which contains multiple filters and applies them

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 03b66d2..c3ec976 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 
 import com.google.common.util.concurrent.Service;
@@ -128,13 +128,13 @@ public interface ReplicationEndpoint extends Service {
    */
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class ReplicateContext {
-    List<HLog.Entry> entries;
+    List<Entry> entries;
     int size;
     @InterfaceAudience.Private
     public ReplicateContext() {
     }
 
-    public ReplicateContext setEntries(List<HLog.Entry> entries) {
+    public ReplicateContext setEntries(List<Entry> entries) {
       this.entries = entries;
       return this;
     }
@@ -142,7 +142,7 @@ public interface ReplicationEndpoint extends Service {
       this.size = size;
       return this;
     }
-    public List<HLog.Entry> getEntries() {
+    public List<Entry> getEntries() {
       return entries;
     }
     public int getSize() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 5df7b25..166dc37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -24,7 +24,7 @@ import java.util.NavigableMap;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Keeps KVs that are scoped other than local

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
index b683ad6..46b8b81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Skips WAL edits for all System tables including META

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 0ea267d..b892512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public class TableCfWALEntryFilter implements WALEntryFilter {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 60797c9..b66ddde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.replication;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * A Filter for WAL entries before being sent over to replication. Multiple
@@ -30,12 +30,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
 public interface WALEntryFilter {
 
   /**
-   * Applies the filter, possibly returning a different HLog.Entry instance.
+   * Applies the filter, possibly returning a different Entry instance.
    * If null is returned, the entry will be skipped.
-   * @param entry WAL Entry to filter
-   * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
+   * @param entry Entry to filter
+   * @return a (possibly modified) Entry to use. Returning null or an entry with
    * no cells will cause the entry to be skipped for replication.
    */
-  public HLog.Entry filter(HLog.Entry entry);
+  public Entry filter(Entry entry);
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 8f099d7..525b7ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -61,17 +61,17 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       return files;
     }
 
-    final Set<String> hlogs = loadHLogsFromQueues();
+    final Set<String> wals = loadWALsFromQueues();
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
       public boolean apply(FileStatus file) {
-        String hlog = file.getPath().getName();
-        boolean logInReplicationQueue = hlogs.contains(hlog);
+        String wal = file.getPath().getName();
+        boolean logInReplicationQueue = wals.contains(wal);
         if (LOG.isDebugEnabled()) {
           if (logInReplicationQueue) {
-            LOG.debug("Found log in ZK, keeping: " + hlog);
+            LOG.debug("Found log in ZK, keeping: " + wal);
           } else {
-            LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
+            LOG.debug("Didn't find this log in ZK, deleting: " + wal);
           }
         }
        return !logInReplicationQueue;
@@ -79,15 +79,15 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
   }
 
   /**
-   * Load all hlogs in all replication queues from ZK
+   * Load all wals in all replication queues from ZK
    */
-  private Set<String> loadHLogsFromQueues() {
+  private Set<String> loadWALsFromQueues() {
     List<String> rss = replicationQueues.getListOfReplicators();
     if (rss == null) {
       LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
       return ImmutableSet.of();
     }
-    Set<String> hlogs = Sets.newHashSet();
+    Set<String> wals = Sets.newHashSet();
     for (String rs: rss) {
       List<String> listOfPeers = replicationQueues.getAllQueues(rs);
       // if rs just died, this will be null
@@ -95,13 +95,13 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
         continue;
       }
       for (String id : listOfPeers) {
-        List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
-        if (peersHlogs != null) {
-          hlogs.addAll(peersHlogs);
+        List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
+        if (peersWals != null) {
+          wals.addAll(peersWals);
         }
       }
     }
-    return hlogs;
+    return wals;
   }
 
   @Override
@@ -109,7 +109,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
     // If replication is disabled, keep all members null
     if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
         HConstants.REPLICATION_ENABLE_DEFAULT)) {
-      LOG.warn("Not configured - allowing all hlogs to be deleted");
+      LOG.warn("Not configured - allowing all wals to be deleted");
       return;
     }
     // Make my own Configuration.  Then I'll have my own connection to zk that

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 0906847..397044d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
@@ -136,7 +136,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
-    List<HLog.Entry> entries = replicateContext.getEntries();
+    List<Entry> entries = replicateContext.getEntries();
     int sleepMultiplier = 1;
     while (this.isRunning()) {
       if (!peersSelected) {
@@ -159,7 +159,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
               " entries of total size " + replicateContext.getSize());
         }
         ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new HLog.Entry[entries.size()]));
+            entries.toArray(new Entry[entries.size()]));
 
         // update metrics
         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2c413f4..4729644 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -66,7 +66,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
  */
 @InterfaceAudience.Private
-public class Replication implements WALActionsListener, 
+public class Replication extends WALActionsListener.Base implements 
   ReplicationSourceService, ReplicationSinkService {
   private static final Log LOG =
       LogFactory.getLog(Replication.class);
@@ -155,7 +155,7 @@ public class Replication implements WALActionsListener,
   }
 
    /*
-    * Returns an object to listen to new hlog changes
+    * Returns an object to listen to new wal changes
     **/
   public WALActionsListener getWALActionsListener() {
     return this;
@@ -222,13 +222,7 @@ public class Replication implements WALActionsListener,
   }
 
   @Override
-  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
-      WALEdit logEdit) {
-    // Not interested
-  }
-
-  @Override
-  public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
+  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
                                        WALEdit logEdit) {
     scopeWALEdits(htd, logKey, logEdit);
   }
@@ -240,7 +234,7 @@ public class Replication implements WALActionsListener,
    * @param logKey Key that may get scoped according to its edits
    * @param logEdit Edits used to lookup the scopes
    */
-  public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
+  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
                                    WALEdit logEdit) {
     NavigableMap<byte[], Integer> scopes =
         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
@@ -273,16 +267,6 @@ public class Replication implements WALActionsListener,
     getReplicationManager().postLogRoll(newPath);
   }
 
-  @Override
-  public void preLogArchive(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
-  @Override
-  public void postLogArchive(Path oldPath, Path newPath) throws IOException {
-    // Not interested
-  }
-
   /**
    * This method modifies the master's configuration in order to inject
    * replication-related features
@@ -299,16 +283,6 @@ public class Replication implements WALActionsListener,
     }
   }
 
-  @Override
-  public void logRollRequested() {
-    // Not interested
-  }
-
-  @Override
-  public void logCloseRequested() {
-    // not interested
-  }
-
   /*
    * Statistics thread. Periodically prints the cache statistics to the log.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
deleted file mode 100644
index ccae169..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-
-import java.io.IOException;
-
-/**
- * Wrapper class around HLog to help manage the implementation details
- * such as compression.
- */
-@InterfaceAudience.Private
-public class ReplicationHLogReaderManager {
-
-  private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
-  private final FileSystem fs;
-  private final Configuration conf;
-  private long position = 0;
-  private HLog.Reader reader;
-  private Path lastPath;
-
-  /**
-   * Creates the helper but doesn't open any file
-   * Use setInitialPosition after using the constructor if some content needs to be skipped
-   * @param fs
-   * @param conf
-   */
-  public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
-    this.fs = fs;
-    this.conf = conf;
-  }
-
-  /**
-   * Opens the file at the current position
-   * @param path
-   * @return an HLog reader.
-   * @throws IOException
-   */
-  public HLog.Reader openReader(Path path) throws IOException {
-    // Detect if this is a new file, if so get a new reader else
-    // reset the current reader so that we see the new data
-    if (this.reader == null || !this.lastPath.equals(path)) {
-      this.closeReader();
-      this.reader = HLogFactory.createReader(this.fs, path, this.conf);
-      this.lastPath = path;
-    } else {
-      try {
-        this.reader.reset();
-      } catch (NullPointerException npe) {
-        throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
-      }
-    }
-    return this.reader;
-  }
-
-  /**
-   * Get the next entry, returned and also added in the array
-   * @return a new entry or null
-   * @throws IOException
-   */
-  public HLog.Entry readNextAndSetPosition() throws IOException {
-    HLog.Entry entry = this.reader.next();
-    // Store the position so that in the future the reader can start
-    // reading from here. If the above call to next() throws an
-    // exception, the position won't be changed and retry will happen
-    // from the last known good position
-    this.position = this.reader.getPosition();
-    // We need to set the CC to null else it will be compressed when sent to the sink
-    if (entry != null) {
-      entry.setCompressionContext(null);
-    }
-    return entry;
-  }
-
-  /**
-   * Advance the reader to the current position
-   * @throws IOException
-   */
-  public void seek() throws IOException {
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-  }
-
-  /**
-   * Get the position that we stopped reading at
-   * @return current position, cannot be negative
-   */
-  public long getPosition() {
-    return this.position;
-  }
-
-  public void setPosition(long pos) {
-    this.position = pos;
-  }
-
-  /**
-   * Close the current reader
-   * @throws IOException
-   */
-  public void closeReader() throws IOException {
-    if (this.reader != null) {
-      this.reader.close();
-      this.reader = null;
-    }
-  }
-
-  /**
-   * Tell the helper to reset internal state
-   */
-  void finishCurrentFile() {
-    this.position = 0;
-    try {
-      this.closeReader();
-    } catch (IOException e) {
-      LOG.warn("Unable to close reader", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 7ed7bec..9a60131 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
  * <p/>
  * This replication process is currently waiting for the edits to be applied
  * before the method can return. This means that the replication of edits
- * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * is synchronized (after reading from WALs in ReplicationSource) and that a
  * single region server cannot receive edits from two sources at the same time
  * <p/>
  * This class uses the native HBase client in order to replicate entries.


Mime
View raw message