hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject hbase git commit: HBASE-14247 Separate the old WALs into different regionserver directories
Date Tue, 17 Oct 2017 11:41:43 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 8b8f7a017 -> 7f1cd12e8


HBASE-14247 Separate the old WALs into different regionserver directories


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f1cd12e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f1cd12e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f1cd12e

Branch: refs/heads/branch-2
Commit: 7f1cd12e8c557d23a9b93c704433435bda8094d3
Parents: 8b8f7a0
Author: Guanghao Zhang <zghao@apache.org>
Authored: Thu May 18 17:34:00 2017 +0800
Committer: Guanghao Zhang <zghao@apache.org>
Committed: Tue Oct 17 19:39:21 2017 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueueInfo.java | 10 +++---
 .../hbase/master/cleaner/CleanerChore.java      |  5 +++
 .../master/cleaner/FileCleanerDelegate.java     |  8 ++++-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 12 ++++---
 .../master/ReplicationLogCleaner.java           | 38 +++++++++++---------
 .../regionserver/DumpReplicationQueues.java     |  5 +--
 .../RecoveredReplicationSource.java             | 21 +++++++----
 .../regionserver/ReplicationSource.java         | 20 +++++++----
 .../ReplicationSourceInterface.java             | 14 ++++++--
 .../ReplicationSourceWALReader.java             |  6 ++--
 .../regionserver/WALEntryStream.java            | 29 ++++++++++++---
 .../hadoop/hbase/wal/AbstractFSWALProvider.java | 30 +++++++++++++++-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |  5 +--
 .../apache/hadoop/hbase/wal/FSHLogProvider.java |  5 +--
 .../hbase/master/cleaner/TestLogsCleaner.java   |  2 ++
 .../replication/ReplicationSourceDummy.java     | 10 ++++--
 .../hbase/replication/TestReplicationBase.java  | 16 +++++++++
 .../TestReplicationKillMasterRS.java            |  3 ++
 .../replication/TestReplicationKillSlaveRS.java |  3 ++
 .../TestReplicationSourceManager.java           |  2 +-
 .../TestReplicationSourceManagerZkImpl.java     |  9 ++---
 .../regionserver/TestWALEntryStream.java        | 26 +++++++-------
 22 files changed, 202 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index c38c50f..68b7ebe 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -42,7 +42,7 @@ public class ReplicationQueueInfo {
   private final String peerClusterZnode;
   private boolean queueRecovered;
   // List of all the dead region servers that had this queue (if recovered)
-  private List<String> deadRegionServers = new ArrayList<>();
+  private List<ServerName> deadRegionServers = new ArrayList<>();
 
   /**
    * The passed znode will be either the id of the peer cluster or
@@ -66,7 +66,7 @@ public class ReplicationQueueInfo {
    * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
    */
   private static void
-      extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
+      extractDeadServersFromZNodeString(String deadServerListStr, List<ServerName> result) {
 
     if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
 
@@ -85,7 +85,7 @@ public class ReplicationQueueInfo {
           if (i > startIndex) {
             String serverName = deadServerListStr.substring(startIndex, i);
             if(ServerName.isFullServerName(serverName)){
-              result.add(serverName);
+              result.add(ServerName.valueOf(serverName));
             } else {
               LOG.error("Found invalid server name:" + serverName);
             }
@@ -103,7 +103,7 @@ public class ReplicationQueueInfo {
     if(startIndex < len - 1){
       String serverName = deadServerListStr.substring(startIndex, len);
       if(ServerName.isFullServerName(serverName)){
-        result.add(serverName);
+        result.add(ServerName.valueOf(serverName));
       } else {
         LOG.error("Found invalid server name at the end:" + serverName);
       }
@@ -112,7 +112,7 @@ public class ReplicationQueueInfo {
     LOG.debug("Found dead servers:" + result);
   }
 
-  public List<String> getDeadRegionServers() {
+  public List<ServerName> getDeadRegionServers() {
     return Collections.unmodifiableList(this.deadRegionServers);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index eac0642..b8ca1ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -141,7 +141,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
     }
   }
 
+  private void preRunCleaner() {
+    cleanersChain.forEach(FileCleanerDelegate::preClean);
+  }
+
   public Boolean runCleaner() {
+    preRunCleaner();
     try {
       FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
       checkAndDeleteEntries(files);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
index fdeb404..9c611f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
@@ -44,4 +44,10 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
    * this method is used to pass some instance into subclass
    * */
   void init(Map<String, Object> params);
-}
+
+  /**
+   * Used to do some initialize work before every period clean
+   */
+  default void preClean() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 649e450..cc9601b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -58,7 +59,6 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
@@ -797,7 +797,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
 
     final Path baseDir = FSUtils.getWALRootDir(conf);
-    final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
+      AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
+      archiveDir = new Path(archiveDir, p.getName());
+    }
     WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
   }
 
@@ -1141,10 +1145,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     System.err.println("Arguments:");
     System.err.println(" --dump  Dump textual representation of passed one or more files");
     System.err.println("         For example: "
-        + "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
+        + "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
     System.err.println(" --split Split the passed directory of WAL logs");
     System.err.println(
-      "         For example: " + "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
+      "         For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 5d6e1ef..3dcb332 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -26,22 +26,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -54,23 +51,31 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
   private ZooKeeperWatcher zkw;
   private ReplicationQueuesClient replicationQueues;
   private boolean stopped = false;
-
+  private Set<String> wals;
+  private long readZKTimestamp = 0;
 
   @Override
-  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
-   // all members of this class are null if replication is disabled,
-   // so we cannot filter the files
-    if (this.getConf() == null) {
-      return files;
-    }
-
-    final Set<String> wals;
+  public void preClean() {
+    readZKTimestamp = EnvironmentEdgeManager.currentTime();
     try {
       // The concurrently created new WALs may not be included in the return list,
       // but they won't be deleted because they're not in the checking set.
       wals = replicationQueues.getAllWALs();
     } catch (KeeperException e) {
       LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      wals = null;
+    }
+  }
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    // all members of this class are null if replication is disabled,
+    // so we cannot filter the files
+    if (this.getConf() == null) {
+      return files;
+    }
+
+    if (wals == null) {
       return Collections.emptyList();
     }
     return Iterables.filter(files, new Predicate<FileStatus>() {
@@ -85,8 +90,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
             LOG.debug("Didn't find this log in ZK, deleting: " + wal);
           }
         }
-       return !logInReplicationQueue;
-      }});
+        return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
+      }
+    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 45b40c5..9d38026 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AtomicLongMap;
 
 /**
@@ -356,7 +356,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
 
     StringBuilder sb = new StringBuilder();
 
-    List<String> deadServers ;
+    List<ServerName> deadServers;
 
     sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
     sb.append("    Queue znode: " + queueId + "\n");
@@ -385,6 +385,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
     return sb.toString();
   }
+
   /**
    *  return total size in bytes from a list of WALs
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 248a52a..cabf85a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
@@ -51,10 +52,10 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
+    super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
       clusterId, replicationEndpoint, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
@@ -98,7 +99,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
       }
       // Path changed - try to find the right path.
       hasPathChanged = true;
-      if (stopper instanceof ReplicationSyncUp.DummyServer) {
+      if (server instanceof ReplicationSyncUp.DummyServer) {
         // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
         // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
         Path newPath = getReplSyncUpPath(path);
@@ -107,12 +108,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
       } else {
         // See if Path exists in the dead RS folder (there could be a chain of failures
         // to look at)
-        List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+        List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
         LOG.info("NB dead servers : " + deadRegionServers.size());
         final Path walDir = FSUtils.getWALRootDir(conf);
-        for (String curDeadServerName : deadRegionServers) {
+        for (ServerName curDeadServerName : deadRegionServers) {
           final Path deadRsDirectory =
-              new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
+              new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
+                  .getServerName()));
           Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
               deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
           for (Path possibleLogLocation : locs) {
@@ -189,4 +191,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
   public String getPeerId() {
     return this.actualPeerId;
   }
+
+  @Override
+  public ServerName getServerWALsBelongTo() {
+    return this.replicationQueueInfo.getDeadRegionServers().get(0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d16a68f..ea6c6d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -59,7 +61,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 
@@ -94,7 +95,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   // The manager of all sources to which we ping back our progress
   protected ReplicationSourceManager manager;
   // Should we stop everything?
-  protected Stoppable stopper;
+  protected Server server;
   // How long should we sleep for each retry
   private long sleepForRetries;
   protected FileSystem fs;
@@ -139,7 +140,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    * @param conf configuration to use
    * @param fs file system to use
    * @param manager replication manager to ping to
-   * @param stopper     the atomic boolean to use to stop the regionserver
+   * @param server the server for this region server
    * @param peerClusterZnode the name of our znode
    * @param clusterId unique UUID for the cluster
    * @param replicationEndpoint the replication endpoint implementation
@@ -148,10 +149,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    */
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
-    this.stopper = stopper;
+    this.server = server;
     this.conf = HBaseConfiguration.create(conf);
     this.waitOnEndpointSeconds =
       this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
@@ -330,7 +331,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       public void uncaughtException(final Thread t, final Throwable e) {
         RSRpcServices.exitIfOOME(e);
         LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
-        stopper.stop("Unexpected exception in " + t.getName());
+        server.stop("Unexpected exception in " + t.getName());
       }
     };
   }
@@ -500,7 +501,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
 
   @Override
   public boolean isSourceActive() {
-    return !this.stopper.isStopped() && this.sourceRunning;
+    return !this.server.isStopped() && this.sourceRunning;
   }
 
   /**
@@ -564,4 +565,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   public WALFileLengthProvider getWALFileLengthProvider() {
     return walFileLengthProvider;
   }
+
+  @Override
+  public ServerName getServerWALsBelongTo() {
+    return server.getServerName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 066f799..b6cf54d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -26,7 +26,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -48,13 +49,13 @@ public interface ReplicationSourceInterface {
    * @param manager the manager to use
    * @param replicationQueues
    * @param replicationPeers
-   * @param stopper the stopper object for this region server
+   * @param server the server for this region server
    * @param peerClusterZnode
    * @param clusterId
    * @throws IOException
    */
   void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
 
@@ -163,4 +164,11 @@ public interface ReplicationSourceInterface {
    * @param batchSize entries size pushed
    */
   void postShipEdits(List<Entry> entries, int batchSize);
+
+  /**
+   * The queue of WALs only belong to one region server. This will return the server name which all
+   * WALs belong to.
+   * @return the server name which all WALs belong to
+   */
+  ServerName getServerWALsBelongTo();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index bb993c6..bbcaaa4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -127,8 +127,10 @@ public class ReplicationSourceWALReader extends Thread {
   public void run() {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
-          source.getWALFileLengthProvider(), source.getSourceMetrics())) {
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, fs, conf, currentPosition,
+              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
+              source.getSourceMetrics())) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           if (!checkQuota()) {
             continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 3be4ca4..6277d24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -63,6 +65,8 @@ class WALEntryStream implements Closeable {
   private final FileSystem fs;
   private final Configuration conf;
   private final WALFileLengthProvider walFileLengthProvider;
+  // which region server the WALs belong to
+  private final ServerName serverName;
   private final MetricsSource metrics;
 
   /**
@@ -71,17 +75,19 @@ class WALEntryStream implements Closeable {
    * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
    * @param conf {@link Configuration} to use to create {@link Reader} for this stream
    * @param startPosition the position in the first WAL to start reading at
+   * @param serverName the server name which all WALs belong to
    * @param metrics replication metrics
    * @throws IOException
    */
   public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-      throws IOException {
+      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
+      MetricsSource metrics) throws IOException {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
     this.currentPosition = startPosition;
     this.walFileLengthProvider = walFileLengthProvider;
+    this.serverName = serverName;
     this.metrics = metrics;
   }
 
@@ -296,15 +302,27 @@ class WALEntryStream implements Closeable {
 
   private Path getArchivedLog(Path path) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
+
+    // Try found the log in old dir
     Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     Path archivedLogLocation = new Path(oldLogDir, path.getName());
     if (fs.exists(archivedLogLocation)) {
       LOG.info("Log " + path + " was moved to " + archivedLogLocation);
       return archivedLogLocation;
-    } else {
-      LOG.error("Couldn't locate log: " + path);
-      return path;
     }
+
+    // Try found the log in the seperate old log dir
+    oldLogDir =
+        new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+    archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    }
+
+    LOG.error("Couldn't locate log: " + path);
+    return path;
   }
 
   private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
@@ -316,6 +334,7 @@ class WALEntryStream implements Closeable {
       throw fnfe;
     }
   }
+
   private void openReader(Path path) throws IOException {
     try {
       // Detect if this is a new file, if so get a new reader else

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 1a81b17..aba13c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -59,6 +58,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
 
   private static final Log LOG = LogFactory.getLog(AbstractFSWALProvider.class);
 
+  /** Separate old log into different dir by regionserver name **/
+  public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
+  public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
+
   // Only public so classes back in regionserver.wal can access
   public interface Reader extends WAL.Reader {
     /**
@@ -273,6 +276,23 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   /**
+   * Construct the directory name for all old WALs on a given server. The default old WALs dir
+   * looks like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver
+   * to true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
+   * @param conf
+   * @param serverName Server name formatted as described in {@link ServerName}
+   * @return the relative WAL directory name
+   */
+  public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
+    StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
+    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
+      dirName.append(Path.SEPARATOR);
+      dirName.append(serverName);
+    }
+    return dirName.toString();
+  }
+
+  /**
    * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
    * this method ignores the format of the logfile component. Current format: [base directory for
    * hbase]/hbase/.logs/ServerName/logfile or [base directory for
@@ -387,6 +407,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
     Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
+      ServerName serverName = getServerNameFromWALDirectoryName(path);
+      if (serverName == null) {
+        LOG.error("Couldn't locate log: " + path);
+        return path;
+      }
+      oldLogDir = new Path(oldLogDir, serverName.getServerName());
+    }
     Path archivedLogLocation = new Path(oldLogDir, path.getName());
     final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 921b08f..8880ca5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -61,8 +61,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
     return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
-        getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
-        true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
+        getWALDirectoryName(factory.factoryId),
+        getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
         eventLoopGroup.next(), channelClass);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index c8a285f..459485c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -76,8 +76,9 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
   @Override
   protected FSHLog createWAL() throws IOException {
     return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
-        getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
-        true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+        getWALDirectoryName(factory.factoryId),
+        getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index f7ee37d..547f72e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -225,6 +225,7 @@ public class TestLogsCleaner {
         "testZooKeeperAbort-faulty", null)) {
       faultyZK.init();
       cleaner.setConf(conf, faultyZK);
+      cleaner.preClean();
       // should keep all files due to a ConnectionLossException getting the queues znodes
       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
       assertFalse(toDelete.iterator().hasNext());
@@ -235,6 +236,7 @@ public class TestLogsCleaner {
     cleaner = new ReplicationLogCleaner();
     try (ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null)) {
       cleaner.setConf(conf, zkw);
+      cleaner.preClean();
       Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
       Iterator<FileStatus> iter = filesToDelete.iterator();
       assertTrue(iter.hasNext());

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index bfe17b5..a12cebd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -25,7 +25,8 @@ import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
@@ -47,7 +48,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
+      ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
       UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.manager = manager;
@@ -142,4 +143,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public WALFileLengthProvider getWALFileLengthProvider() {
     return walFileLengthProvider;
   }
+
+  @Override
+  public ServerName getServerWALsBelongTo() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 206b500..58b97b9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
@@ -36,10 +38,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * This class is only a base for other integration-level replication tests.
@@ -82,6 +87,14 @@ public class TestReplicationBase {
   protected static final byte[] row = Bytes.toBytes("row");
   protected static final byte[] noRepfamName = Bytes.toBytes("norep");
 
+  @Parameter
+  public static boolean seperateOldWALs;
+
+  @Parameters
+  public static List<Boolean> params() {
+    return Arrays.asList(false, true);
+  }
+
   /**
    * @throws java.lang.Exception
    */
@@ -106,6 +119,9 @@ public class TestReplicationBase {
     conf1.setFloat("replication.source.ratio", 1.0f);
     conf1.setBoolean("replication.source.eof.autorecovery", true);
 
+    // Parameter config
+    conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
+
     utility1 = new HBaseTestingUtility(conf1);
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
index 51a39a6..5487c04 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Runs the TestReplicationKillRS test and selects the RS to kill in the master cluster
  * Do not add other tests in this class.
  */
+@RunWith(Parameterized.class)
 @Category({ReplicationTests.class, LargeTests.class})
 public class TestReplicationKillMasterRS extends TestReplicationKillRS {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
index 07e18b2..6a824d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Runs the TestReplicationKillRS test and selects the RS to kill in the slave cluster
  * Do not add other tests in this class.
  */
+@RunWith(Parameterized.class)
 @Category({ReplicationTests.class, LargeTests.class})
 public class TestReplicationKillSlaveRS extends TestReplicationKillRS {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 3934e05..5712146 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -645,7 +645,7 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-        ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
+        ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
         UUID clusterId, ReplicationEndpoint replicationEndpoint,
         WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 945d9f4..b47a8d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -105,11 +106,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
     String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
     rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
-    List<String> result = replicationQueueInfo.getDeadRegionServers();
+    List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
     // verify
-    assertTrue(result.contains(server.getServerName().getServerName()));
-    assertTrue(result.contains(s1.getServerName().getServerName()));
-    assertTrue(result.contains(s2.getServerName().getServerName()));
+    assertTrue(result.contains(server.getServerName()));
+    assertTrue(result.contains(s1.getServerName()));
+    assertTrue(result.contains(s2.getServerName()));
 
     server.stop("");
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7f1cd12e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index d65054c..28ee101 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -148,7 +148,7 @@ public class TestWALEntryStream {
           log.rollWriter();
 
           try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+              new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
             int i = 0;
             while (entryStream.hasNext()) {
               assertNotNull(entryStream.next());
@@ -175,7 +175,7 @@ public class TestWALEntryStream {
     appendToLog();
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.next();
@@ -193,7 +193,7 @@ public class TestWALEntryStream {
     appendToLog();
 
     try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
-        log, new MetricsSource("1"))) {
+        log, null, new MetricsSource("1"))) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -207,7 +207,7 @@ public class TestWALEntryStream {
     appendToLog();
 
     try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
-        log, new MetricsSource("1"))) {
+        log, null, new MetricsSource("1"))) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -232,7 +232,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -257,7 +257,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -280,7 +280,7 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
@@ -288,7 +288,7 @@ public class TestWALEntryStream {
     }
     // next stream should picks up where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
@@ -306,13 +306,13 @@ public class TestWALEntryStream {
     appendEntriesToLog(3);
     // read only one element
     try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
-        log, new MetricsSource("1"))) {
+        log, null, new MetricsSource("1"))) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -323,7 +323,7 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -334,7 +334,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -440,7 +440,7 @@ public class TestWALEntryStream {
     long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
     try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
-        p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
+        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
       assertTrue(entryStream.hasNext());
       assertNotNull(entryStream.next());
       // can not get log 2


Mime
View raw message