hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [13/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.
Date Tue, 02 Dec 2014 17:20:52 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1098de8..6e2ef2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -41,8 +41,9 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
 import com.google.common.collect.Lists;
@@ -94,8 +96,8 @@ public class ReplicationSource extends Thread
   private long replicationQueueSizeCapacity;
   // Max number of entries in entriesArray
   private int replicationQueueNbCapacity;
-  // Our reader for the current log
-  private HLog.Reader reader;
+  // Our reader for the current log. open/close handled by repLogReader
+  private WAL.Reader reader;
   // Last position in the log that we sent to ZooKeeper
   private long lastLoggedPosition = -1;
   // Path of the current log
@@ -122,7 +124,7 @@ public class ReplicationSource extends Thread
   // Metrics for this source
   private MetricsSource metrics;
   // Handle on the log reader helper
-  private ReplicationHLogReaderManager repLogReader;
+  private ReplicationWALReaderManager repLogReader;
   //WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
   // ReplicationEndpoint which will handle the actual replication
@@ -176,7 +178,7 @@ public class ReplicationSource extends Thread
     this.manager = manager;
     this.fs = fs;
     this.metrics = metrics;
-    this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
+    this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
     this.clusterId = clusterId;
 
     this.peerClusterZnode = peerClusterZnode;
@@ -342,7 +344,7 @@ public class ReplicationSource extends Thread
 
       boolean gotIOE = false;
       currentNbOperations = 0;
-      List<HLog.Entry> entries = new ArrayList<HLog.Entry>(1);
+      List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
       currentSize = 0;
       try {
         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
@@ -420,7 +422,7 @@ public class ReplicationSource extends Thread
    * @throws IOException
    */
   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
-      List<HLog.Entry> entries) throws IOException{
+      List<WAL.Entry> entries) throws IOException {
     long seenEntries = 0;
     if (LOG.isTraceEnabled()) {
       LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -428,7 +430,7 @@ public class ReplicationSource extends Thread
     }
     this.repLogReader.seek();
     long positionBeforeRead = this.repLogReader.getPosition();
-    HLog.Entry entry =
+    WAL.Entry entry =
         this.repLogReader.readNextAndSetPosition();
     while (entry != null) {
       this.metrics.incrLogEditsRead();
@@ -440,7 +442,7 @@ public class ReplicationSource extends Thread
         // Remove all KVs that should not be replicated
         entry = walEntryFilter.filter(entry);
         WALEdit edit = null;
-        HLogKey logKey = null;
+        WALKey logKey = null;
         if (entry != null) {
           edit = entry.getEdit();
           logKey = entry.getKey();
@@ -521,12 +523,13 @@ public class ReplicationSource extends Thread
           // to look at)
           List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
           LOG.info("NB dead servers : " + deadRegionServers.size());
+          final Path rootDir = FSUtils.getRootDir(this.conf);
           for (String curDeadServerName : deadRegionServers) {
-            Path deadRsDirectory =
-                new Path(manager.getLogDir().getParent(), curDeadServerName);
+            final Path deadRsDirectory = new Path(rootDir,
+                DefaultWALProvider.getWALDirectoryName(curDeadServerName));
             Path[] locs = new Path[] {
                 new Path(deadRsDirectory, currentPath.getName()),
-                new Path(deadRsDirectory.suffix(HLog.SPLITTING_EXT),
+                new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
                                           currentPath.getName()),
             };
             for (Path possibleLogLocation : locs) {
@@ -536,6 +539,7 @@ public class ReplicationSource extends Thread
                 LOG.info("Log " + this.currentPath + " still exists at " +
                     possibleLogLocation);
                 // Breaking here will make us sleep since reader is null
+                // TODO why don't we need to set currentPath and call openReader here?
                 return true;
               }
             }
@@ -543,6 +547,8 @@ public class ReplicationSource extends Thread
           // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
           // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
           if (stopper instanceof ReplicationSyncUp.DummyServer) {
+            // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
+            //      area rather than to the wal area for a particular region server.
             FileStatus[] rss = fs.listStatus(manager.getLogDir());
             for (FileStatus rs : rss) {
               Path p = rs.getPath();
@@ -551,7 +557,7 @@ public class ReplicationSource extends Thread
                 p = new Path(p, log.getPath().getName());
                 if (p.getName().equals(currentPath.getName())) {
                   currentPath = p;
-                  LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
+                  LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
                   // Open the log at the new location
                   this.openReader(sleepMultiplier);
                   return true;
@@ -591,7 +597,7 @@ public class ReplicationSource extends Thread
       if (ioe.getCause() instanceof NullPointerException) {
         // Workaround for race condition in HDFS-4380
         // which throws a NPE if we open a file before any data node has the most recent block
-        // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
+        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
         LOG.warn("Got NPE opening reader, will retry.");
       } else if (sleepMultiplier == this.maxRetriesMultiplier) {
         // TODO Need a better way to determine if a file is really gone but
@@ -656,7 +662,7 @@ public class ReplicationSource extends Thread
    * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
    * written to when this method was called
    */
-  protected void shipEdits(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries) {
+  protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
     int sleepMultiplier = 1;
     if (entries.isEmpty()) {
       LOG.warn("Was given 0 edits to ship");

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ad1b088..4908ebc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -63,7 +63,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * sources. There are two classes of sources:
  * <li> Normal sources are persistent and one per peer cluster</li>
  * <li> Old sources are recovered from a failed region server and our
- * only goal is to finish replicating the HLog queue it had up in ZK</li>
+ * only goal is to finish replicating the WAL queue it had up in ZK</li>
  *
  * When a region server dies, this class uses a watcher to get notified and it
  * tries to grab a lock in order to transfer all the queues in a local
@@ -88,16 +88,16 @@ public class ReplicationSourceManager implements ReplicationListener {
   // All about stopping
   private final Server server;
   // All logs we are currently tracking
-  private final Map<String, SortedSet<String>> hlogsById;
+  private final Map<String, SortedSet<String>> walsById;
   // Logs for recovered sources we are currently tracking
-  private final Map<String, SortedSet<String>> hlogsByIdRecoveredQueues;
+  private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
   private final Configuration conf;
   private final FileSystem fs;
   // The path to the latest log we saw, for new coming sources
   private Path latestPath;
-  // Path to the hlogs directories
+  // Path to the wals directories
   private final Path logDir;
-  // Path to the hlog archive
+  // Path to the wal archive
   private final Path oldLogDir;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
@@ -115,7 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param conf the configuration to use
    * @param server the server for this region server
    * @param fs the file system to use
-   * @param logDir the directory that contains all hlog directories of live RSs
+   * @param logDir the directory that contains all wal directories of live RSs
    * @param oldLogDir the directory where old logs are archived
    * @param clusterId
    */
@@ -130,8 +130,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
-    this.hlogsById = new HashMap<String, SortedSet<String>>();
-    this.hlogsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
+    this.walsById = new HashMap<String, SortedSet<String>>();
+    this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
     this.conf = conf;
     this.fs = fs;
@@ -159,7 +159,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   /**
    * Provide the id of the peer and a log key and this method will figure which
-   * hlog it belongs to and will log, for this region server, the current
+   * wal it belongs to and will log, for this region server, the current
    * position. It will also clean old logs from the queue.
    * @param log Path to the log currently being replicated from
    * replication status in zookeeper. It will also delete older entries.
@@ -187,32 +187,32 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
     if (queueRecovered) {
-      SortedSet<String> hlogs = hlogsByIdRecoveredQueues.get(id);
-      if (hlogs != null && !hlogs.first().equals(key)) {
-        cleanOldLogs(hlogs, key, id);
+      SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
+      if (wals != null && !wals.first().equals(key)) {
+        cleanOldLogs(wals, key, id);
       }
     } else {
-      synchronized (this.hlogsById) {
-        SortedSet<String> hlogs = hlogsById.get(id);
-        if (!hlogs.first().equals(key)) {
-          cleanOldLogs(hlogs, key, id);
+      synchronized (this.walsById) {
+        SortedSet<String> wals = walsById.get(id);
+        if (!wals.first().equals(key)) {
+          cleanOldLogs(wals, key, id);
         }
       }
     }
  }
   
-  private void cleanOldLogs(SortedSet<String> hlogs, String key, String id) {
-    SortedSet<String> hlogSet = hlogs.headSet(key);
-    LOG.debug("Removing " + hlogSet.size() + " logs in the list: " + hlogSet);
-    for (String hlog : hlogSet) {
-      this.replicationQueues.removeLog(id, hlog);
+  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
+    SortedSet<String> walSet = wals.headSet(key);
+    LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    for (String wal : walSet) {
+      this.replicationQueues.removeLog(id, wal);
     }
-    hlogSet.clear();
+    walSet.clear();
   }
 
   /**
    * Adds a normal source per registered peer cluster and tries to process all
-   * old region server hlog queues
+   * old region server wal queues
    */
   protected void init() throws IOException, ReplicationException {
     for (String id : this.replicationPeers.getPeerIds()) {
@@ -248,13 +248,13 @@ public class ReplicationSourceManager implements ReplicationListener {
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
-    synchronized (this.hlogsById) {
+    synchronized (this.walsById) {
       this.sources.add(src);
-      this.hlogsById.put(id, new TreeSet<String>());
-      // Add the latest hlog to that source's queue
+      this.walsById.put(id, new TreeSet<String>());
+      // Add the latest wal to that source's queue
       if (this.latestPath != null) {
         String name = this.latestPath.getName();
-        this.hlogsById.get(id).add(name);
+        this.walsById.get(id).add(name);
         try {
           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
         } catch (ReplicationException e) {
@@ -272,8 +272,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Delete a complete queue of hlogs associated with a peer cluster
-   * @param peerId Id of the peer cluster queue of hlogs to delete
+   * Delete a complete queue of wals associated with a peer cluster
+   * @param peerId Id of the peer cluster queue of wals to delete
    */
   public void deleteSource(String peerId, boolean closeConnection) {
     this.replicationQueues.removeQueue(peerId);
@@ -296,19 +296,19 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Get a copy of the hlogs of the first source on this rs
-   * @return a sorted set of hlog names
+   * Get a copy of the wals of the first source on this rs
+   * @return a sorted set of wal names
    */
-  protected Map<String, SortedSet<String>> getHLogs() {
-    return Collections.unmodifiableMap(hlogsById);
+  protected Map<String, SortedSet<String>> getWALs() {
+    return Collections.unmodifiableMap(walsById);
   }
   
   /**
-   * Get a copy of the hlogs of the recovered sources on this rs
-   * @return a sorted set of hlog names
+   * Get a copy of the wals of the recovered sources on this rs
+   * @return a sorted set of wal names
    */
-  protected Map<String, SortedSet<String>> getHlogsByIdRecoveredQueues() {
-    return Collections.unmodifiableMap(hlogsByIdRecoveredQueues);
+  protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
+    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
   /**
@@ -328,7 +328,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   void preLogRoll(Path newLog) throws IOException {
-    synchronized (this.hlogsById) {
+    synchronized (this.walsById) {
       String name = newLog.getName();
       for (ReplicationSourceInterface source : this.sources) {
         try {
@@ -338,13 +338,13 @@ public class ReplicationSourceManager implements ReplicationListener {
               + source.getPeerClusterZnode() + ", filename=" + name, e);
         }
       }
-      for (SortedSet<String> hlogs : this.hlogsById.values()) {
+      for (SortedSet<String> wals : this.walsById.values()) {
         if (this.sources.isEmpty()) {
-          // If there's no slaves, don't need to keep the old hlogs since
+          // If there's no slaves, don't need to keep the old wals since
           // we only consider the last one when a new slave comes in
-          hlogs.clear();
+          wals.clear();
         }
-        hlogs.add(name);
+        wals.add(name);
       }
     }
 
@@ -452,7 +452,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
     this.oldsources.remove(src);
     deleteSource(src.getPeerClusterZnode(), false);
-    this.hlogsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
+    this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
   }
 
   /**
@@ -569,7 +569,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
         // We either didn't get the lock or the failed region server didn't have any outstanding
-        // HLogs to replicate, so we are done.
+        // WALs to replicate, so we are done.
         return;
       }
 
@@ -600,12 +600,12 @@ public class ReplicationSourceManager implements ReplicationListener {
             break;
           }
           oldsources.add(src);
-          SortedSet<String> hlogsSet = entry.getValue();
-          for (String hlog : hlogsSet) {
-            src.enqueueLog(new Path(oldLogDir, hlog));
+          SortedSet<String> walsSet = entry.getValue();
+          for (String wal : walsSet) {
+            src.enqueueLog(new Path(oldLogDir, wal));
           }
           src.startup();
-          hlogsByIdRecoveredQueues.put(peerId, hlogsSet);
+          walsByIdRecoveredQueues.put(peerId, walsSet);
         } catch (IOException e) {
           // TODO manage it
           LOG.error("Failed creating a source", e);
@@ -615,16 +615,16 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Get the directory where hlogs are archived
-   * @return the directory where hlogs are archived
+   * Get the directory where wals are archived
+   * @return the directory where wals are archived
    */
   public Path getOldLogDir() {
     return this.oldLogDir;
   }
 
   /**
-   * Get the directory where hlogs are stored by their RSs
-   * @return the directory where hlogs are stored by their RSs
+   * Get the directory where wals are stored by their RSs
+   * @return the directory where wals are stored by their RSs
    */
   public Path getLogDir() {
     return this.logDir;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
new file mode 100644
index 0000000..b63f66b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class around WAL to help manage the implementation details
+ * such as compression.
+ */
+@InterfaceAudience.Private
+public class ReplicationWALReaderManager {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
+  private final FileSystem fs;
+  private final Configuration conf;
+  private long position = 0;
+  private Reader reader;
+  private Path lastPath;
+
+  /**
+   * Creates the helper but doesn't open any file
+   * Use setInitialPosition after using the constructor if some content needs to be skipped
+   * @param fs
+   * @param conf
+   */
+  public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
+    this.fs = fs;
+    this.conf = conf;
+  }
+
+  /**
+   * Opens the file at the current position
+   * @param path
+   * @return an WAL reader.
+   * @throws IOException
+   */
+  public Reader openReader(Path path) throws IOException {
+    // Detect if this is a new file, if so get a new reader else
+    // reset the current reader so that we see the new data
+    if (this.reader == null || !this.lastPath.equals(path)) {
+      this.closeReader();
+      this.reader = WALFactory.createReader(this.fs, path, this.conf);
+      this.lastPath = path;
+    } else {
+      try {
+        this.reader.reset();
+      } catch (NullPointerException npe) {
+        throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+      }
+    }
+    return this.reader;
+  }
+
+  /**
+   * Get the next entry, returned and also added in the array
+   * @return a new entry or null
+   * @throws IOException
+   */
+  public Entry readNextAndSetPosition() throws IOException {
+    Entry entry = this.reader.next();
+    // Store the position so that in the future the reader can start
+    // reading from here. If the above call to next() throws an
+    // exception, the position won't be changed and retry will happen
+    // from the last known good position
+    this.position = this.reader.getPosition();
+    // We need to set the CC to null else it will be compressed when sent to the sink
+    if (entry != null) {
+      entry.setCompressionContext(null);
+    }
+    return entry;
+  }
+
+  /**
+   * Advance the reader to the current position
+   * @throws IOException
+   */
+  public void seek() throws IOException {
+    if (this.position != 0) {
+      this.reader.seek(this.position);
+    }
+  }
+
+  /**
+   * Get the position that we stopped reading at
+   * @return current position, cannot be negative
+   */
+  public long getPosition() {
+    return this.position;
+  }
+
+  public void setPosition(long pos) {
+    this.position = pos;
+  }
+
+  /**
+   * Close the current reader
+   * @throws IOException
+   */
+  public void closeReader() throws IOException {
+    if (this.reader != null) {
+      this.reader.close();
+      this.reader = null;
+    }
+  }
+
+  /**
+   * Tell the helper to reset internal state
+   */
+  void finishCurrentFile() {
+    this.position = 0;
+    try {
+      this.closeReader();
+    } catch (IOException e) {
+      LOG.warn("Unable to close reader", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 0a78385..7ab77a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -30,7 +30,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HbaseObjectWritableFor96Migration.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HbaseObjectWritableFor96Migration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HbaseObjectWritableFor96Migration.java
index ac751a4..2d7d9c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HbaseObjectWritableFor96Migration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/HbaseObjectWritableFor96Migration.java
@@ -87,8 +87,10 @@ import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ProtoUtil;
 import org.apache.hadoop.io.MapWritable;
@@ -228,8 +230,8 @@ class HbaseObjectWritableFor96Migration implements Writable, WritableWithSize, C
 
     addToMap(Delete [].class, code++);
 
-    addToMap(HLog.Entry.class, code++);
-    addToMap(HLog.Entry[].class, code++);
+    addToMap(Entry.class, code++);
+    addToMap(Entry[].class, code++);
     addToMap(HLogKey.class, code++);
 
     addToMap(List.class, code++);
@@ -539,6 +541,26 @@ class HbaseObjectWritableFor96Migration implements Writable, WritableWithSize, C
       byte [] scanBytes = ProtobufUtil.toScan(scan).toByteArray();
       out.writeInt(scanBytes.length);
       out.write(scanBytes);
+    } else if (Entry.class.isAssignableFrom(declClass)) {
+      // Entry is no longer Writable, maintain compatible serialization.
+      // Writables write their exact runtime class
+      Class <?> c = instanceObj.getClass();
+      Integer code = CLASS_TO_CODE.get(c);
+      if (code == null) {
+        out.writeByte(NOT_ENCODED);
+        Text.writeString(out, c.getName());
+      } else {
+        writeClassCode(out, c);
+      }
+      final Entry entry = (Entry)instanceObj;
+      // We only support legacy HLogKey
+      WALKey key = entry.getKey();
+      if (!(key instanceof HLogKey)) {
+        throw new IOException("Can't write Entry '" + instanceObj + "' due to key class '" +
+            key.getClass() + "'");
+      }
+      ((HLogKey)key).write(out);
+      entry.getEdit().write(out);
     } else {
       throw new IOException("Can't write: "+instanceObj+" as "+declClass);
     }
@@ -673,6 +695,9 @@ class HbaseObjectWritableFor96Migration implements Writable, WritableWithSize, C
       int b = (byte)WritableUtils.readVInt(in);
       if (b == NOT_ENCODED) {
         String className = Text.readString(in);
+        if ("org.apache.hadoop.hbase.regionserver.wal.HLog$Entry".equals(className)) {
+          className = Entry.class.getName();
+        }
         try {
           instanceClass = getClassByName(conf, className);
         } catch (ClassNotFoundException e) {
@@ -695,6 +720,13 @@ class HbaseObjectWritableFor96Migration implements Writable, WritableWithSize, C
           declaredClass = ((NullInstance)instance).declaredClass;
           instance = null;
         }
+      } else if (Entry.class.isAssignableFrom(instanceClass)) {
+        // Entry stopped being Writable; maintain serialization support.
+        final HLogKey key = new HLogKey();
+        final WALEdit edit = new WALEdit();
+        key.readFields(in);
+        edit.readFields(in);
+        instance = new Entry(key, edit);
       } else {
         int length = in.readInt();
         byte[] objectBytes = new byte[length];

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
index 3374dc4..6519fc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 import com.google.common.util.concurrent.ListenableFuture;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
index bccc609..a2fd75f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.io.FileLink;
 import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
@@ -79,7 +79,7 @@ import org.apache.hadoop.util.ToolRunner;
  * Export the specified snapshot to a given FileSystem.
  *
  * The .snapshot/name folder is copied to the destination cluster
- * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
+ * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
  * When everything is done, the second cluster can restore the snapshot.
  */
 @InterfaceAudience.Public
@@ -402,7 +402,7 @@ public class ExportSnapshot extends Configured implements Tool {
           case WAL:
             String serverName = fileInfo.getWalServer();
             String logName = fileInfo.getWalName();
-            link = new HLogLink(inputRoot, serverName, logName);
+            link = new WALLink(inputRoot, serverName, logName);
             break;
           default:
             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
@@ -425,7 +425,7 @@ public class ExportSnapshot extends Configured implements Tool {
             link = new HFileLink(inputRoot, inputArchive, inputPath);
             break;
           case WAL:
-            link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
+            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
             break;
           default:
             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
@@ -477,7 +477,7 @@ public class ExportSnapshot extends Configured implements Tool {
   // ==========================================================================
 
   /**
-   * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
+   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
    * @return list of files referenced by the snapshot (pair of path and size)
    */
   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
@@ -525,7 +525,7 @@ public class ExportSnapshot extends Configured implements Tool {
             .setWalName(logfile)
             .build();
 
-          long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
+          long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
         }
     });
@@ -781,7 +781,7 @@ public class ExportSnapshot extends Configured implements Tool {
   }
 
   /**
-   * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
+   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
    * @return 0 on success, and != 0 upon failure.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
index d38a94f..77b17d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
  * <ol>
  * <li> Table Descriptor
  * <li> Snapshot creation time, type, format version, ...
- * <li> List of hfiles and hlogs
+ * <li> List of hfiles and wals
  * <li> Stats about hfiles and logs sizes, percentage of shared with the source table, ...
  * </ol>
  */
@@ -243,7 +243,7 @@ public final class SnapshotInfo extends Configured implements Tool {
      * @return the log information
      */
     FileInfo addLogFile(final String server, final String logfile) throws IOException {
-      HLogLink logLink = new HLogLink(conf, server, logfile);
+      WALLink logLink = new WALLink(conf, server, logfile);
       long size = -1;
       try {
         size = logLink.getFileStatus(fs).getLen();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
index 2c3913a..9297ea0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.FSVisitor;
 
@@ -73,7 +73,7 @@ public final class SnapshotReferenceUtil {
    * @return path to the log home directory for the archive files.
    */
   public static Path getLogsDir(Path snapshotDir, String serverName) {
-    return new Path(snapshotDir, HLogUtil.getHLogDirectoryName(serverName));
+    return new Path(snapshotDir, DefaultWALProvider.getWALDirectoryName(serverName));
   }
 
   /**
@@ -364,9 +364,9 @@ public final class SnapshotReferenceUtil {
    * @param fs {@link FileSystem}
    * @param snapshotDir {@link Path} to the Snapshot directory
    * @throws IOException if an error occurred while scanning the directory
-   * @return the names of hlogs in the specified snaphot
+   * @return the names of wals in the specified snaphot
    */
-  public static Set<String> getHLogNames(final FileSystem fs, final Path snapshotDir)
+  public static Set<String> getWALNames(final FileSystem fs, final Path snapshotDir)
       throws IOException {
     final Set<String> names = new HashSet<String>();
     visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
index f904565..8bdac15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
@@ -262,7 +262,7 @@ public class FSHDFSUtils extends FSUtils {
     } catch (IOException e) {
       if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
         // This exception comes out instead of FNFE, fix it
-        throw new FileNotFoundException("The given HLog wasn't found at " + p);
+        throw new FileNotFoundException("The given WAL wasn't found at " + p);
       } else if (e instanceof FileNotFoundException) {
         throw (FileNotFoundException)e;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
index 5e14db6..f0cc0c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 
 /**
  * Utility methods for interacting with the hbase.root file system.
@@ -179,7 +179,7 @@ public final class FSVisitor {
    */
   public static void visitRegionRecoveredEdits(final FileSystem fs, final Path regionDir,
       final FSVisitor.RecoveredEditsVisitor visitor) throws IOException {
-    NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regionDir);
+    NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir);
     if (files == null || files.size() == 0) return;
 
     for (Path source: files) {
@@ -213,16 +213,16 @@ public final class FSVisitor {
     for (FileStatus serverLogs: logServerDirs) {
       String serverName = serverLogs.getPath().getName();
 
-      FileStatus[] hlogs = FSUtils.listStatus(fs, serverLogs.getPath());
-      if (hlogs == null) {
+      FileStatus[] wals = FSUtils.listStatus(fs, serverLogs.getPath());
+      if (wals == null) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("No hfiles found for server: " + serverName + ", skipping.");
+          LOG.trace("No wals found for server: " + serverName + ", skipping.");
         }
         continue;
       }
 
-      for (FileStatus hlogRef: hlogs) {
-        visitor.logFile(serverName, hlogRef.getPath().getName());
+      for (FileStatus walRef: wals) {
+        visitor.logFile(serverName, walRef.getPath().getName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 90f5bdf..d0c749a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -102,7 +102,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.Block
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@@ -707,7 +707,7 @@ public class HBaseFsck extends Configured {
   /**
    * Orphaned regions are regions without a .regioninfo file in them.  We "adopt"
    * these orphans by creating a new region, and moving the column families,
-   * recovered edits, HLogs, into the new region dir.  We determine the region
+   * recovered edits, WALs, into the new region dir.  We determine the region
    * startkey and endkeys by looking at all of the hfiles inside the column
    * families to identify the min and max keys. The resulting region will
    * likely violate table integrity but will be dealt with by merging
@@ -3556,7 +3556,7 @@ public class HBaseFsck extends Configured {
             // This is special case if a region is left after split
             he.hdfsOnlyEdits = true;
             FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
-            Path ePath = HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
+            Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath());
             for (FileStatus subDir : subDirs) {
               String sdName = subDir.getPath().getName();
               if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
index 64809e5..2e0f53d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WALFactory;
 
 /**
  * A non-instantiable class that has a static method capable of compacting
@@ -143,7 +142,7 @@ class HMerge {
     protected final FileSystem fs;
     protected final Path rootDir;
     protected final HTableDescriptor htd;
-    protected final HLog hlog;
+    protected final WALFactory walFactory;
     private final long maxFilesize;
 
 
@@ -159,7 +158,9 @@ class HMerge {
       this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, tabledir);
       String logname = "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME;
 
-      this.hlog = HLogFactory.createHLog(fs, tabledir, logname, conf);
+      final Configuration walConf = new Configuration(conf);
+      FSUtils.setRootDir(walConf, tabledir);
+      this.walFactory = new WALFactory(walConf, null, logname);
     }
 
     void process() throws IOException {
@@ -173,8 +174,7 @@ class HMerge {
         }
       } finally {
         try {
-          hlog.closeAndDelete();
-
+          walFactory.close();
         } catch(IOException e) {
           LOG.error(e);
         }
@@ -193,10 +193,12 @@ class HMerge {
       long nextSize = 0;
       for (int i = 0; i < info.length - 1; i++) {
         if (currentRegion == null) {
-          currentRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i], this.htd, hlog);
+          currentRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i], this.htd,
+              walFactory.getWAL(info[i].getEncodedNameAsBytes()));
           currentSize = currentRegion.getLargestHStoreSize();
         }
-        nextRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i + 1], this.htd, hlog);
+        nextRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i + 1], this.htd,
+            walFactory.getWAL(info[i+1].getEncodedNameAsBytes()));
         nextSize = nextRegion.getLargestHStoreSize();
 
         if ((currentSize + nextSize) <= (maxFilesize / 2)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index 14a94c0..3c3ad66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -180,10 +180,9 @@ public class Merge extends Configured implements Tool {
           Bytes.toStringBinary(meta.getRegionName()));
     }
     HRegion merged = null;
-    HLog log = utils.getLog();
-    HRegion r1 = HRegion.openHRegion(info1, htd, log, getConf());
+    HRegion r1 = HRegion.openHRegion(info1, htd, utils.getLog(info1), getConf());
     try {
-      HRegion r2 = HRegion.openHRegion(info2, htd, log, getConf());
+      HRegion r2 = HRegion.openHRegion(info2, htd, utils.getLog(info2), getConf());
       try {
         merged = HRegion.merge(r1, r2);
       } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
index 4ff2a94..7f9e019 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MetaUtils.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
 
 /**
  * Contains utility methods for manipulating HBase meta tables.
@@ -49,7 +49,7 @@ public class MetaUtils {
   private static final Log LOG = LogFactory.getLog(MetaUtils.class);
   private final Configuration conf;
   private FileSystem fs;
-  private HLog log;
+  private WALFactory walFactory;
   private HRegion metaRegion;
   private Map<byte [], HRegion> metaRegions = Collections.synchronizedSortedMap(
     new TreeMap<byte [], HRegion>(Bytes.BYTES_COMPARATOR));
@@ -81,17 +81,19 @@ public class MetaUtils {
   }
 
   /**
-   * @return the HLog
+   * @return the WAL associated with the given region
    * @throws IOException e
    */
-  public synchronized HLog getLog() throws IOException {
-    if (this.log == null) {
+  public synchronized WAL getLog(HRegionInfo info) throws IOException {
+    if (this.walFactory == null) {
       String logName = 
           HConstants.HREGION_LOGDIR_NAME + "_" + System.currentTimeMillis();
-      this.log = HLogFactory.createHLog(this.fs, this.fs.getHomeDirectory(),
-                                        logName, this.conf);
+      final Configuration walConf = new Configuration(this.conf);
+      FSUtils.setRootDir(walConf, fs.getHomeDirectory());
+      this.walFactory = new WALFactory(walConf, null, logName);
     }
-    return this.log;
+    final byte[] region = info.getEncodedNameAsBytes();
+    return info.isMetaRegion() ? walFactory.getMetaWAL(region) : walFactory.getWAL(region);
   }
 
   /**
@@ -106,11 +108,11 @@ public class MetaUtils {
   }
 
   /**
-   * Closes catalog regions if open. Also closes and deletes the HLog. You
+   * Closes catalog regions if open. Also closes and deletes the WAL. You
    * must call this method if you want to persist changes made during a
    * MetaUtils edit session.
    */
-  public void shutdown() {
+  public synchronized void shutdown() {
     if (this.metaRegion != null) {
       try {
         this.metaRegion.close();
@@ -131,14 +133,11 @@ public class MetaUtils {
       metaRegions.clear();
     }
     try {
-      if (this.log != null) {
-        this.log.rollWriter();
-        this.log.closeAndDelete();
+      if (this.walFactory != null) {
+        this.walFactory.close();
       }
     } catch (IOException e) {
-      LOG.error("closing HLog", e);
-    } finally {
-      this.log = null;
+      LOG.error("closing WAL", e);
     }
   }
 
@@ -147,7 +146,7 @@ public class MetaUtils {
       return this.metaRegion;
     }
     this.metaRegion = HRegion.openHRegion(HRegionInfo.FIRST_META_REGIONINFO,
-      HTableDescriptor.META_TABLEDESC, getLog(),
+      HTableDescriptor.META_TABLEDESC, getLog(HRegionInfo.FIRST_META_REGIONINFO),
       this.conf);
     this.metaRegion.compactStores();
     return this.metaRegion;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
index deeebbd..c926d54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
@@ -107,7 +107,7 @@ import com.google.common.collect.Sets;
  * profiling is much easier with manual splits. It is hard to trace the logs to
  * understand region level problems if it keeps splitting and getting renamed.
  * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
- * HLog or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
+ * WAL or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
  * you notice it a day or so later, you can be assured that the regions
  * specified in these files are the same as the current regions and you have
  * less headaches trying to restore/replay your data.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
new file mode 100644
index 0000000..b710059
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -0,0 +1,369 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+
+/**
+ * A WAL Provider that returns a single thread safe WAL that writes to HDFS.
+ * By default, this implementation picks a directory in HDFS based on a combination of
+ * <ul>
+ *   <li>the HBase root directory
+ *   <li>HConstants.HREGION_LOGDIR_NAME
+ *   <li>the given factory's factoryId (usually identifying the regionserver by host:port)
+ * </ul>
+ * It also uses the providerId to diffentiate among files.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DefaultWALProvider implements WALProvider {
+  private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class);
+
+  // Only public so classes back in regionserver.wal can access
+  public interface Reader extends WAL.Reader {
+    /**
+     * @param fs File system.
+     * @param path Path.
+     * @param c Configuration.
+     * @param s Input stream that may have been pre-opened by the caller; may be null.
+     */
+    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
+  }
+
+  // Only public so classes back in regionserver.wal can access
+  public interface Writer extends WALProvider.Writer {
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
+  }
+
+  private FSHLog log = null;
+
+  /**
+   * @param factory factory that made us, identity used for FS layout. may not be null
+   * @param conf may not be null
+   * @param listeners may be null
+   * @param providerId differentiate between providers from one facotry, used for FS layout. may be
+   *                   null
+   */
+  @Override
+  public void init(final WALFactory factory, final Configuration conf,
+      final List<WALActionsListener> listeners, String providerId) throws IOException {
+    if (null != log) {
+      throw new IllegalStateException("WALProvider.init should only be called once.");
+    }
+    if (null == providerId) {
+      providerId = DEFAULT_PROVIDER_ID;
+    }
+    final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
+    log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+        getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
+        true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+  }
+
+  @Override
+  public WAL getWAL(final byte[] identifier) throws IOException {
+   return log;
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.close();
+  }
+
+  @Override
+  public void shutdown() throws IOException {
+    log.shutdown();
+  }
+
+  // should be package private; more visible for use in FSHLog
+  public static final String WAL_FILE_NAME_DELIMITER = ".";
+  /** The hbase:meta region's WAL filename extension */
+  @VisibleForTesting
+  public static final String META_WAL_PROVIDER_ID = ".meta";
+  static final String DEFAULT_PROVIDER_ID = "default";
+
+  // Implementation details that currently leak in tests or elsewhere follow
+  /** File Extension used while splitting an WAL into regions (HBASE-2312) */
+  public static final String SPLITTING_EXT = "-splitting";
+
+  /**
+   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
+   * count the number of files (rolled and active). if either of them aren't, count 0
+   * for that provider.
+   * @param walFactory may not be null.
+   */
+  public static long getNumLogFiles(WALFactory walFactory) {
+    long result = 0;
+    if (walFactory.provider instanceof DefaultWALProvider) {
+      result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getNumLogFiles();
+    }
+    WALProvider meta = walFactory.metaProvider.get();
+    if (meta instanceof DefaultWALProvider) {
+      result += ((FSHLog)((DefaultWALProvider)meta).log).getNumLogFiles();
+    }
+    return result;
+  }
+
+  /**
+   * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
+   * count the size of files (rolled and active). if either of them aren't, count 0
+   * for that provider.
+   * @param walFactory may not be null.
+   */
+  public static long getLogFileSize(WALFactory walFactory) {
+    long result = 0;
+    if (walFactory.provider instanceof DefaultWALProvider) {
+      result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getLogFileSize();
+    }
+    WALProvider meta = walFactory.metaProvider.get();
+    if (meta instanceof DefaultWALProvider) {
+      result += ((FSHLog)((DefaultWALProvider)meta).log).getLogFileSize();
+    }
+    return result;
+  }
+
+  /**
+   * returns the number of rolled WAL files.
+   */
+  @VisibleForTesting
+  public static int getNumRolledLogFiles(WAL wal) {
+    return ((FSHLog)wal).getNumRolledLogFiles();
+  }
+
+  /**
+   * return the current filename from the current wal.
+   */
+  @VisibleForTesting
+  public static Path getCurrentFileName(final WAL wal) {
+    return ((FSHLog)wal).getCurrentFileName();
+  }
+
+  /**
+   * request a log roll, but don't actually do it.
+   */
+  @VisibleForTesting
+  static void requestLogRoll(final WAL wal) {
+    ((FSHLog)wal).requestLogRoll();
+  }
+
+  /**
+   * It returns the file create timestamp from the file name.
+   * For name format see {@link #validateWALFilename(String)}
+   * public until remaining tests move to o.a.h.h.wal
+   * @param wal must not be null
+   * @return the file number that is part of the WAL file name
+   */
+  @VisibleForTesting
+  public static long extractFileNumFromWAL(final WAL wal) {
+    final Path walName = ((FSHLog)wal).getCurrentFileName();
+    if (walName == null) {
+      throw new IllegalArgumentException("The WAL path couldn't be null");
+    }
+    final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
+    return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2:1)]);
+  }
+
+  /**
+   * Pattern used to validate a WAL file name
+   * see {@link #validateWALFilename(String)} for description.
+   */
+  private static final Pattern pattern = Pattern.compile(".*\\.\\d*("+META_WAL_PROVIDER_ID+")*");
+
+  /**
+   * A WAL file name is of the format:
+   * &lt;wal-name&gt;{@link #WAL_FILE_NAME_DELIMITER}&lt;file-creation-timestamp&gt;[.meta].
+   *
+   * provider-name is usually made up of a server-name and a provider-id
+   *
+   * @param filename name of the file to validate
+   * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt>
+   *         otherwise
+   */
+  public static boolean validateWALFilename(String filename) {
+    return pattern.matcher(filename).matches();
+  }
+
+  /**
+   * Construct the directory name for all WALs on a given server.
+   *
+   * @param serverName
+   *          Server name formatted as described in {@link ServerName}
+   * @return the relative WAL directory name, e.g.
+   *         <code>.logs/1.example.org,60030,12345</code> if
+   *         <code>serverName</code> passed is
+   *         <code>1.example.org,60030,12345</code>
+   */
+  public static String getWALDirectoryName(final String serverName) {
+    StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
+    dirName.append("/");
+    dirName.append(serverName);
+    return dirName.toString();
+  }
+
+  /**
+   * Pulls a ServerName out of a Path generated according to our layout rules.
+   *
+   * In the below layouts, this method ignores the format of the logfile component.
+   *
+   * Current format:
+   *
+   * [base directory for hbase]/hbase/.logs/ServerName/logfile
+   *      or
+   * [base directory for hbase]/hbase/.logs/ServerName-splitting/logfile
+   *
+   * Expected to work for individual log files and server-specific directories.
+   *
+   * @return null if it's not a log file. Returns the ServerName of the region
+   *         server that created this log file otherwise.
+   */
+  public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path)
+      throws IOException {
+    if (path == null
+        || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
+      return null;
+    }
+
+    if (conf == null) {
+      throw new IllegalArgumentException("parameter conf must be set");
+    }
+
+    final String rootDir = conf.get(HConstants.HBASE_DIR);
+    if (rootDir == null || rootDir.isEmpty()) {
+      throw new IllegalArgumentException(HConstants.HBASE_DIR
+          + " key not found in conf.");
+    }
+
+    final StringBuilder startPathSB = new StringBuilder(rootDir);
+    if (!rootDir.endsWith("/"))
+      startPathSB.append('/');
+    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
+    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
+      startPathSB.append('/');
+    final String startPath = startPathSB.toString();
+
+    String fullPath;
+    try {
+      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
+    } catch (IllegalArgumentException e) {
+      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
+      return null;
+    }
+
+    if (!fullPath.startsWith(startPath)) {
+      return null;
+    }
+
+    final String serverNameAndFile = fullPath.substring(startPath.length());
+
+    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
+      // Either it's a file (not a directory) or it's not a ServerName format
+      return null;
+    }
+
+    Path p = new Path(path);
+    return getServerNameFromWALDirectoryName(p);
+  }
+
+  /**
+   * This function returns region server name from a log file name which is in one of the following
+   * formats:
+   * <ul>
+   *   <li>hdfs://<name node>/hbase/.logs/<server name>-splitting/...
+   *   <li>hdfs://<name node>/hbase/.logs/<server name>/...
+   * </ul>
+   * @param logFile
+   * @return null if the passed in logFile isn't a valid WAL file path
+   */
+  public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
+    String logDirName = logFile.getParent().getName();
+    // We were passed the directory and not a file in it.
+    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+      logDirName = logFile.getName();
+    }
+    ServerName serverName = null;
+    if (logDirName.endsWith(SPLITTING_EXT)) {
+      logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length());
+    }
+    try {
+      serverName = ServerName.parseServerName(logDirName);
+    } catch (IllegalArgumentException ex) {
+      serverName = null;
+      LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
+    }
+    if (serverName != null && serverName.getStartcode() < 0) {
+      LOG.warn("Invalid log file path=" + logFile);
+      serverName = null;
+    }
+    return serverName;
+  }
+
+  public static boolean isMetaFile(Path p) {
+    return isMetaFile(p.getName());
+  }
+
+  public static boolean isMetaFile(String p) {
+    if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * public because of FSHLog. Should be package-private
+   */
+  public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path,
+      final boolean overwritable)
+      throws IOException {
+    // Configuration already does caching for the Class lookup.
+    Class<? extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
+        ProtobufLogWriter.class, Writer.class);
+    try {
+      Writer writer = logWriterClass.newInstance();
+      writer.init(fs, path, conf, overwritable);
+      return writer;
+    } catch (Exception e) {
+      LOG.debug("Error instantiating log writer.", e);
+      throw new IOException("cannot get log writer", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
new file mode 100644
index 0000000..ac23916
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * No-op implementation of {@link WALProvider} used when the WAL is disabled.
+ *
+ * Should only be used when severe data loss is acceptable.
+ *
+ */
+@InterfaceAudience.Private
+class DisabledWALProvider implements WALProvider {
+
+  private static final Log LOG = LogFactory.getLog(DisabledWALProvider.class);
+
+  WAL disabled;
+
+  @Override
+  public void init(final WALFactory factory, final Configuration conf,
+      final List<WALActionsListener> listeners, final String providerId) throws IOException {
+    if (null != disabled) {
+      throw new IllegalStateException("WALProvider.init should only be called once.");
+    }
+    disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
+  }
+
+  @Override
+  public WAL getWAL(final byte[] identifier) throws IOException {
+    return disabled;
+  }
+
+  @Override
+  public void close() throws IOException {
+    disabled.close();
+  }
+
+  @Override
+  public void shutdown() throws IOException {
+    disabled.shutdown();
+  }
+
+  private static class DisabledWAL implements WAL {
+    protected final List<WALActionsListener> listeners =
+        new CopyOnWriteArrayList<WALActionsListener>();
+    protected final Path path;
+    protected final WALCoprocessorHost coprocessorHost;
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public DisabledWAL(final Path path, final Configuration conf,
+        final List<WALActionsListener> listeners) {
+      this.coprocessorHost = new WALCoprocessorHost(this, conf);
+      this.path = path;
+      if (null != listeners) {
+        for(WALActionsListener listener : listeners) {
+          registerWALActionsListener(listener);
+        }
+      }
+    }
+
+    @Override
+    public void registerWALActionsListener(final WALActionsListener listener) {
+      listeners.add(listener);
+    }
+    
+    @Override
+    public boolean unregisterWALActionsListener(final WALActionsListener listener) {
+      return listeners.remove(listener);
+    }
+
+    @Override
+    public byte[][] rollWriter() {
+      if (!listeners.isEmpty()) {
+        for (WALActionsListener listener : listeners) {
+          listener.logRollRequested();
+        }
+        for (WALActionsListener listener : listeners) {
+          try {
+            listener.preLogRoll(path, path);
+          } catch (IOException exception) {
+            LOG.debug("Ignoring exception from listener.", exception);
+          }
+        }
+        for (WALActionsListener listener : listeners) {
+          try {
+            listener.postLogRoll(path, path);
+          } catch (IOException exception) {
+            LOG.debug("Ignoring exception from listener.", exception);
+          }
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public byte[][] rollWriter(boolean force) {
+      return rollWriter();
+    }
+
+    @Override
+    public void shutdown() {
+      if(closed.compareAndSet(false, true)) {
+        if (!this.listeners.isEmpty()) {
+          for (WALActionsListener listener : this.listeners) {
+            listener.logCloseRequested();
+          }
+        }
+      }
+    }
+
+    @Override
+    public void close() {
+      shutdown();
+    }
+
+    @Override
+    public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
+        AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
+      if (!this.listeners.isEmpty()) {
+        final long start = System.nanoTime();
+        long len = 0;
+        for (Cell cell : edits.getCells()) {
+          len += CellUtil.estimatedSerializedSizeOf(cell);
+        }
+        final long elapsed = (System.nanoTime() - start)/1000000l;
+        for (WALActionsListener listener : this.listeners) {
+          listener.postAppend(len, elapsed);
+        }
+      }
+      return -1;
+    }
+
+    @Override
+    public void sync() {
+      if (!this.listeners.isEmpty()) {
+        for (WALActionsListener listener : this.listeners) {
+          listener.postSync(0l, 0);
+        }
+      }
+    }
+
+    @Override
+    public void sync(long txid) {
+      sync();
+    }
+
+    @Override
+    public boolean startCacheFlush(final byte[] encodedRegionName) {
+      return !(closed.get());
+    }
+
+    @Override
+    public void completeCacheFlush(final byte[] encodedRegionName) {
+    }
+
+    @Override
+    public void abortCacheFlush(byte[] encodedRegionName) {
+    }
+
+    @Override
+    public WALCoprocessorHost getCoprocessorHost() {
+      return coprocessorHost;
+    }
+
+    @Override
+    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+      return HConstants.NO_SEQNUM;
+    }
+
+    @Override
+    public String toString() {
+      return "WAL disabled.";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
new file mode 100644
index 0000000..787a34b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+// imports we use from yet-to-be-moved regionsever.wal
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
+ * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface WAL {
+
+  /**
+   * Registers WALActionsListener
+   */
+  void registerWALActionsListener(final WALActionsListener listener);
+
+  /**
+   * Unregisters WALActionsListener
+   */
+  boolean unregisterWALActionsListener(final WALActionsListener listener);
+
+  /**
+   * Roll the log writer. That is, start writing log messages to a new file.
+   *
+   * <p>
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
+   *
+   * @return If lots of logs, flush the returned regions so next time through we
+   *         can clean logs. Returns null if nothing to flush. Names are actual
+   *         region names as returned by {@link HRegionInfo#getEncodedName()}
+   */
+  byte[][] rollWriter() throws FailedLogCloseException, IOException;
+
+  /**
+   * Roll the log writer. That is, start writing log messages to a new file.
+   *
+   * <p>
+   * The implementation is synchronized in order to make sure there's one rollWriter
+   * running at any given time.
+   *
+   * @param force
+   *          If true, force creation of a new writer even if no entries have
+   *          been written to the current writer
+   * @return If lots of logs, flush the returned regions so next time through we
+   *         can clean logs. Returns null if nothing to flush. Names are actual
+   *         region names as returned by {@link HRegionInfo#getEncodedName()}
+   */
+  byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException;
+
+  /**
+   * Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
+   * Extant edits are left in place in backing storage to be replayed later.
+   */
+  void shutdown() throws IOException;
+
+  /**
+   * Caller no longer needs any edits from this WAL. Implementers are free to reclaim
+   * underlying resources after this call; i.e. filesystem based WALs can archive or
+   * delete files.
+   */
+  void close() throws IOException;
+
+  /**
+   * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
+   * completes BUT on return this edit must have its region edit/sequence id assigned
+   * else it messes up our unification of mvcc and sequenceid.  On return <code>key</code> will
+   * have the region edit/sequence id filled in.
+   * @param info
+   * @param key Modified by this call; we add to it this edits region edit/sequence id.
+   * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
+   * sequence id that is after all currently appended edits.
+   * @param htd used to give scope for replication TODO refactor out in favor of table name and info
+   * @param sequenceId A reference to the atomic long the <code>info</code> region is using as
+   * source of its incrementing edits sequence id.  Inside in this call we will increment it and
+   * attach the sequence to the edit we apply the WAL.
+   * @param inMemstore Always true except for case where we are writing a compaction completion
+   * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
+   * -- it is not an edit for memstore.
+   * @param memstoreKVs list of KVs added into memstore
+   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
+   * in it.
+   */
+  long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
+      AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs)
+  throws IOException;
+
+  /**
+   * Sync what we have in the WAL.
+   * @throws IOException
+   */
+  void sync() throws IOException;
+
+  /**
+   * Sync the WAL if the txId was not already sync'd.
+   * @param txid Transaction id to sync to.
+   * @throws IOException
+   */
+  void sync(long txid) throws IOException;
+
+  /**
+   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
+   * in order to be able to do cleanup. This method tells WAL that some region is about
+   * to flush memstore.
+   *
+   * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
+   * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit,
+   * AtomicLong, boolean, List)} as new oldest seqnum.
+   * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
+   * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
+   *
+   * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
+   * closing) and flush couldn't be started.
+   */
+  boolean startCacheFlush(final byte[] encodedRegionName);
+
+  /**
+   * Complete the cache flush.
+   * @param encodedRegionName Encoded region name.
+   */
+  void completeCacheFlush(final byte[] encodedRegionName);
+
+  /**
+   * Abort a cache flush. Call if the flush fails. Note that the only recovery
+   * for an aborted flush currently is a restart of the regionserver so the
+   * snapshot content dropped by the failure gets restored to the memstore.v
+   * @param encodedRegionName Encoded region name.
+   */
+  void abortCacheFlush(byte[] encodedRegionName);
+
+  /**
+   * @return Coprocessor host.
+   */
+  WALCoprocessorHost getCoprocessorHost();
+
+
+  /** Gets the earliest sequence number in the memstore for this particular region.
+   * This can serve as best-effort "recent" WAL number for this region.
+   * @param encodedRegionName The region to get the number for.
+   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   */
+  long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
+
+  /**
+   * Human readable identifying information about the state of this WAL.
+   * Implementors are encouraged to include information appropriate for debugging.
+   * Consumers are advised not to rely on the details of the returned String; it does
+   * not have a defined structure.
+   */
+  @Override
+  String toString();
+
+  /**
+   * When outside clients need to consume persisted WALs, they rely on a provided
+   * Reader.
+   */
+  interface Reader extends Closeable {
+    Entry next() throws IOException;
+    Entry next(Entry reuse) throws IOException;
+    void seek(long pos) throws IOException;
+    long getPosition() throws IOException;
+    void reset() throws IOException;
+  }
+
+  /**
+   * Utility class that lets us keep track of the edit with it's key.
+   */
+  class Entry {
+    private WALEdit edit;
+    private WALKey key;
+
+    public Entry() {
+      edit = new WALEdit();
+      // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+      key = new HLogKey();
+    }
+
+    /**
+     * Constructor for both params
+     *
+     * @param edit log's edit
+     * @param key log's key
+     */
+    public Entry(WALKey key, WALEdit edit) {
+      super();
+      this.key = key;
+      this.edit = edit;
+    }
+
+    /**
+     * Gets the edit
+     *
+     * @return edit
+     */
+    public WALEdit getEdit() {
+      return edit;
+    }
+
+    /**
+     * Gets the key
+     *
+     * @return key
+     */
+    public WALKey getKey() {
+      return key;
+    }
+
+    /**
+     * Set compression context for this entry.
+     *
+     * @param compressionContext
+     *          Compression context
+     */
+    public void setCompressionContext(CompressionContext compressionContext) {
+      edit.setCompressionContext(compressionContext);
+      key.setCompressionContext(compressionContext);
+    }
+
+    @Override
+    public String toString() {
+      return this.key + "=" + this.edit;
+    }
+
+  }
+
+}


Mime
View raw message