hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/3] HBASE-11072 Abstract WAL splitting from ZK (Sergey Soldatov)
Date Fri, 05 Sep 2014 04:35:36 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 c0d81e9ad -> 66220e492


http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index cd401bd..3c3d2a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -22,111 +22,69 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.SplitLogCounters;
-import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.master.SplitLogManager;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
- * This worker is spawned in every regionserver (should we also spawn one in
- * the master?). The Worker waits for log splitting tasks to be put up by the
- * {@link SplitLogManager} running in the master and races with other workers
- * in other serves to acquire those tasks. The coordination is done via
- * zookeeper. All the action takes place at /hbase/splitlog znode.
+ * This worker is spawned in every regionserver, including master. The Worker waits for log
+ * splitting tasks to be put up by the {@link SplitLogManager} running in the master and races with
+ * other workers in other serves to acquire those tasks. The coordination is done via coordination
+ * engine.
  * <p>
- * If a worker has successfully moved the task from state UNASSIGNED to
- * OWNED then it owns the task. It keeps heart beating the manager by
- * periodically moving the task from UNASSIGNED to OWNED state. On success it
- * moves the task to TASK_DONE. On unrecoverable error it moves task state to
- * ERR. If it cannot continue but wants the master to retry the task then it
- * moves the task state to RESIGNED.
+ * If a worker has successfully moved the task from state UNASSIGNED to OWNED then it owns the task.
+ * It keeps heart beating the manager by periodically moving the task from UNASSIGNED to OWNED
+ * state. On success it moves the task to TASK_DONE. On unrecoverable error it moves task state to
+ * ERR. If it cannot continue but wants the master to retry the task then it moves the task state to
+ * RESIGNED.
  * <p>
- * The manager can take a task away from a worker by moving the task from
- * OWNED to UNASSIGNED. In the absence of a global lock there is a
- * unavoidable race here - a worker might have just finished its task when it
- * is stripped of its ownership. Here we rely on the idempotency of the log
+ * The manager can take a task away from a worker by moving the task from OWNED to UNASSIGNED. In
+ * the absence of a global lock there is a unavoidable race here - a worker might have just finished
+ * its task when it is stripped of its ownership. Here we rely on the idempotency of the log
  * splitting task for correctness
  */
 @InterfaceAudience.Private
-public class SplitLogWorker extends ZooKeeperListener implements Runnable {
-  public static final int DEFAULT_MAX_SPLITTERS = 2;
+public class SplitLogWorker implements Runnable {
 
   private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
-  private static final int checkInterval = 5000; // 5 seconds
-  private static final int FAILED_TO_OWN_TASK = -1;
 
   Thread worker;
-  private final ServerName serverName;
-  private final TaskExecutor splitTaskExecutor;
   // thread pool which executes recovery work
-  private final ExecutorService executorService;
-
-  private final Object taskReadyLock = new Object();
-  volatile int taskReadySeq = 0;
-  private volatile String currentTask = null;
-  private int currentVersion;
-  private volatile boolean exitWorker;
-  private final Object grabTaskLock = new Object();
-  private boolean workerInGrabTask = false;
-  private final int report_period;
-  private RegionServerServices server = null;
-  private Configuration conf = null;
-  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
-  private int maxConcurrentTasks = 0;
-
-  public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
+  private SplitLogWorkerCoordination coordination;
+  private Configuration conf;
+  private RegionServerServices server;
+  public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
       TaskExecutor splitTaskExecutor) {
-    super(watcher);
     this.server = server;
-    this.serverName = server.getServerName();
-    this.splitTaskExecutor = splitTaskExecutor;
-    report_period = conf.getInt("hbase.splitlog.report.period",
-      conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
     this.conf = conf;
-    this.executorService = this.server.getExecutorService();
-    this.maxConcurrentTasks =
-        conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
+    this.coordination =
+        ((BaseCoordinatedStateManager) hserver.getCoordinatedStateManager())
+            .getSplitLogWorkerCoordination();
+    this.server = server;
+    coordination.init(server, conf, splitTaskExecutor, this);
   }
 
-  public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
+  public SplitLogWorker(final Server hserver, final Configuration conf,
       final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
-    this(watcher, conf, server, new TaskExecutor() {
+    this(server, conf, server, new TaskExecutor() {
       @Override
       public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
         Path rootdir;
@@ -143,7 +101,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
         // encountered a bad non-retry-able persistent error.
         try {
           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
-            fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) {
+            fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -160,9 +118,6 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
           } else if (cause instanceof InterruptedException) {
             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
             return Status.RESIGNED;
-          } else if(cause instanceof KeeperException) {
-            LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
-            return Status.RESIGNED;
           }
           LOG.warn("log splitting of " + filename + " failed, returning error", e);
           return Status.ERR;
@@ -175,32 +130,22 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
   @Override
   public void run() {
     try {
-      LOG.info("SplitLogWorker " + this.serverName + " starting");
-      this.watcher.registerListener(this);
+      LOG.info("SplitLogWorker " + server.getServerName() + " starting");
+      coordination.registerListener();
       // pre-initialize a new connection for splitlogworker configuration
       HConnectionManager.getConnection(conf);
 
-      // wait for master to create the splitLogZnode
-      int res = -1;
-      while (res == -1 && !exitWorker) {
-        try {
-          res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
-        } catch (KeeperException e) {
-          // ignore
-          LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
-        }
-        if (res == -1) {
-          LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
-          Thread.sleep(1000);
-        }
+      // wait for Coordination Engine is ready
+      boolean res = false;
+      while (!res && !coordination.isStop()) {
+        res = coordination.isReady();
       }
-
-      if (!exitWorker) {
-          taskLoop();
+      if (!coordination.isStop()) {
+        coordination.taskLoop();
       }
     } catch (Throwable t) {
       if (ExceptionUtil.isInterrupt(t)) {
-        LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" :
+        LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" :
             " (ERROR: exitWorker is not set, exiting anyway)"));
       } else {
         // only a logical error can cause here. Printing it out
@@ -208,394 +153,24 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
         LOG.error("unexpected error ", t);
       }
     } finally {
-      LOG.info("SplitLogWorker " + this.serverName + " exiting");
-    }
-  }
-
-  /**
-   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
-   * one at a time. This policy puts an upper-limit on the number of
-   * simultaneous log splitting that could be happening in a cluster.
-   * <p>
-   * Synchronization using {@link #taskReadyLock} ensures that it will
-   * try to grab every task that has been put up
-   */
-  private void taskLoop() throws InterruptedException {
-    while (!exitWorker) {
-      int seq_start = taskReadySeq;
-      List<String> paths = getTaskList();
-      if (paths == null) {
-        LOG.warn("Could not get tasks, did someone remove " +
-            this.watcher.splitLogZNode + " ... worker thread exiting.");
-        return;
-      }
-      // pick meta wal firstly
-      int offset = (int) (Math.random() * paths.size());
-      for(int i = 0; i < paths.size(); i ++){
-        if(HLogUtil.isMetaFile(paths.get(i))) {
-          offset = i;
-          break;
-        }
-      }
-      int numTasks = paths.size();
-      for (int i = 0; i < numTasks; i++) {
-        int idx = (i + offset) % paths.size();
-        // don't call ZKSplitLog.getNodeName() because that will lead to
-        // double encoding of the path name
-        if (this.calculateAvailableSplitters(numTasks) > 0) {
-          grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
-        } else {
-          LOG.debug("Current region server " + this.serverName + " has "
-              + this.tasksInProgress.get() + " tasks in progress and can't take more.");
-          break;
-        }
-        if (exitWorker) {
-          return;
-        }
-      }
-      SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
-      synchronized (taskReadyLock) {
-        while (seq_start == taskReadySeq) {
-          taskReadyLock.wait(checkInterval);
-          if (this.server != null) {
-            // check to see if we have stale recovering regions in our internal memory state
-            Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
-            if (!recoveringRegions.isEmpty()) {
-              // Make a local copy to prevent ConcurrentModificationException when other threads
-              // modify recoveringRegions
-              List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
-              for (String region : tmpCopy) {
-                String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
-                try {
-                  if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
-                    HRegion r = recoveringRegions.remove(region);
-                    if (r != null) {
-                      r.setRecovering(false);
-                    }
-                    LOG.debug("Mark recovering region:" + region + " up.");
-                  } else {
-                    // current check is a defensive(or redundant) mechanism to prevent us from
-                    // having stale recovering regions in our internal RS memory state while
-                    // zookeeper(source of truth) says differently. We stop at the first good one
-                    // because we should not have a single instance such as this in normal case so
-                    // check the first one is good enough.
-                    break;
-                  }
-                } catch (KeeperException e) {
-                  // ignore zookeeper error
-                  LOG.debug("Got a zookeeper when trying to open a recovering region", e);
-                  break;
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * try to grab a 'lock' on the task zk node to own and execute the task.
-   * <p>
-   * @param path zk node for the task
-   */
-  private void grabTask(String path) {
-    Stat stat = new Stat();
-    byte[] data;
-    synchronized (grabTaskLock) {
-      currentTask = path;
-      workerInGrabTask = true;
-      if (Thread.interrupted()) {
-        return;
-      }
-    }
-    try {
-      try {
-        if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
-          SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
-          return;
-        }
-      } catch (KeeperException e) {
-        LOG.warn("Failed to get data for znode " + path, e);
-        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
-        return;
-      }
-      SplitLogTask slt;
-      try {
-        slt = SplitLogTask.parseFrom(data);
-      } catch (DeserializationException e) {
-        LOG.warn("Failed parse data for znode " + path, e);
-        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
-        return;
-      }
-      if (!slt.isUnassigned()) {
-        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
-        return;
-      }
-
-      currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
-        stat.getVersion());
-      if (currentVersion < 0) {
-        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
-        return;
-      }
-
-      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
-        HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
-          SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
-        return;
-      }
-
-      LOG.info("worker " + serverName + " acquired task " + path);
-      SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
-      getDataSetWatchAsync();
-
-      submitTask(path, slt.getMode(), currentVersion, this.report_period);
-
-      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
-      try {
-        int sleepTime = RandomUtils.nextInt(500) + 500;
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while yielding for other region servers", e);
-        Thread.currentThread().interrupt();
-      }
-    } finally {
-      synchronized (grabTaskLock) {
-        workerInGrabTask = false;
-        // clear the interrupt from stopTask() otherwise the next task will
-        // suffer
-        Thread.interrupted();
-      }
-    }
-  }
-
-
-  /**
-   * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
-   * <p>
-   * This method is also used to periodically heartbeat the task progress by transitioning the node
-   * from OWNED to OWNED.
-   * <p>
-   * @param isFirstTime
-   * @param zkw
-   * @param server
-   * @param task
-   * @param taskZKVersion
-   * @return non-negative integer value when task can be owned by current region server otherwise -1
-   */
-  protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
-      ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
-    int latestZKVersion = FAILED_TO_OWN_TASK;
-    try {
-      SplitLogTask slt = new SplitLogTask.Owned(server, mode);
-      Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
-      if (stat == null) {
-        LOG.warn("zk.setData() returned null for path " + task);
-        SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
-        return FAILED_TO_OWN_TASK;
-      }
-      latestZKVersion = stat.getVersion();
-      SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
-      return latestZKVersion;
-    } catch (KeeperException e) {
-      if (!isFirstTime) {
-        if (e.code().equals(KeeperException.Code.NONODE)) {
-          LOG.warn("NONODE failed to assert ownership for " + task, e);
-        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
-          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
-        } else {
-          LOG.warn("failed to assert ownership for " + task, e);
-        }
-      }
-    } catch (InterruptedException e1) {
-      LOG.warn("Interrupted while trying to assert ownership of " +
-          task + " " + StringUtils.stringifyException(e1));
-      Thread.currentThread().interrupt();
-    }
-    SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
-    return FAILED_TO_OWN_TASK;
-  }
-
-  /**
-   * This function calculates how many splitters it could create based on expected average tasks per
-   * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
-   * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
-   * @param numTasks current total number of available tasks
-   */
-  private int calculateAvailableSplitters(int numTasks) {
-    // at lease one RS(itself) available
-    int availableRSs = 1;
-    try {
-      List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
-      availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
-    } catch (KeeperException e) {
-      // do nothing
-      LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
+      coordination.removeListener();
+      LOG.info("SplitLogWorker " + server.getServerName() + " exiting");
     }
-
-    int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
-    expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
-    // calculate how many more splitters we could spawn
-    return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
   }
-
-  /**
-   * Submit a log split task to executor service
-   * @param curTask
-   * @param curTaskZKVersion
-   */
-  void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
-      final int reportPeriod) {
-    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
-
-    CancelableProgressable reporter = new CancelableProgressable() {
-      private long last_report_at = 0;
-
-      @Override
-      public boolean progress() {
-        long t = EnvironmentEdgeManager.currentTime();
-        if ((t - last_report_at) > reportPeriod) {
-          last_report_at = t;
-          int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode,
-            zkVersion.intValue());
-          if (latestZKVersion < 0) {
-            LOG.warn("Failed to heartbeat the task" + curTask);
-            return false;
-          }
-          zkVersion.setValue(latestZKVersion);
-        }
-        return true;
-      }
-    };
-
-    HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter,
-      this.tasksInProgress, this.splitTaskExecutor, mode);
-    this.executorService.submit(hsh);
-  }
-
-  void getDataSetWatchAsync() {
-    this.watcher.getRecoverableZooKeeper().getZooKeeper().
-      getData(currentTask, this.watcher,
-      new GetDataAsyncCallback(), null);
-    SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
-  }
-
-  void getDataSetWatchSuccess(String path, byte[] data) {
-    SplitLogTask slt;
-    try {
-      slt = SplitLogTask.parseFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed parse", e);
-      return;
-    }
-    synchronized (grabTaskLock) {
-      if (workerInGrabTask) {
-        // currentTask can change but that's ok
-        String taskpath = currentTask;
-        if (taskpath != null && taskpath.equals(path)) {
-          // have to compare data. cannot compare version because then there
-          // will be race with attemptToOwnTask()
-          // cannot just check whether the node has been transitioned to
-          // UNASSIGNED because by the time this worker sets the data watch
-          // the node might have made two transitions - from owned by this
-          // worker to unassigned to owned by another worker
-          if (! slt.isOwned(this.serverName) &&
-              ! slt.isDone(this.serverName) &&
-              ! slt.isErr(this.serverName) &&
-              ! slt.isResigned(this.serverName)) {
-            LOG.info("task " + taskpath + " preempted from " +
-                serverName + ", current task state and owner=" + slt.toString());
-            stopTask();
-          }
-        }
-      }
-    }
-  }
-
-  void getDataSetWatchFailure(String path) {
-    synchronized (grabTaskLock) {
-      if (workerInGrabTask) {
-        // currentTask can change but that's ok
-        String taskpath = currentTask;
-        if (taskpath != null && taskpath.equals(path)) {
-          LOG.info("retrying data watch on " + path);
-          SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
-          getDataSetWatchAsync();
-        } else {
-          // no point setting a watch on the task which this worker is not
-          // working upon anymore
-        }
-      }
-    }
-  }
-
-  @Override
-  public void nodeDataChanged(String path) {
-    // there will be a self generated dataChanged event every time attemptToOwnTask()
-    // heartbeats the task znode by upping its version
-    synchronized (grabTaskLock) {
-      if (workerInGrabTask) {
-        // currentTask can change
-        String taskpath = currentTask;
-        if (taskpath!= null && taskpath.equals(path)) {
-          getDataSetWatchAsync();
-        }
-      }
-    }
-  }
-
-
-  private List<String> getTaskList() throws InterruptedException {
-    List<String> childrenPaths = null;
-    long sleepTime = 1000;
-    // It will be in loop till it gets the list of children or
-    // it will come out if worker thread exited.
-    while (!exitWorker) {
-      try {
-        childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
-            this.watcher.splitLogZNode);
-        if (childrenPaths != null) {
-          return childrenPaths;
-        }
-      } catch (KeeperException e) {
-        LOG.warn("Could not get children of znode "
-            + this.watcher.splitLogZNode, e);
-      }
-        LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
-            + " after sleep for " + sleepTime + "ms!");
-        Thread.sleep(sleepTime);
-    }
-    return childrenPaths;
-  }
-
-  @Override
-  public void nodeChildrenChanged(String path) {
-    if(path.equals(watcher.splitLogZNode)) {
-      LOG.debug("tasks arrived or departed");
-      synchronized (taskReadyLock) {
-        taskReadySeq++;
-        taskReadyLock.notify();
-      }
-    }
-  }
-
   /**
    * If the worker is doing a task i.e. splitting a log file then stop the task.
    * It doesn't exit the worker thread.
    */
-  void stopTask() {
+  public void stopTask() {
     LOG.info("Sending interrupt to stop the worker thread");
     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
   }
 
-
   /**
    * start the SplitLogWorker thread
    */
   public void start() {
-    worker = new Thread(null, this, "SplitLogWorker-" + serverName);
-    exitWorker = false;
+    worker = new Thread(null, this, "SplitLogWorker-" + server.getServerName());
     worker.start();
   }
 
@@ -603,30 +178,11 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
    * stop the SplitLogWorker thread
    */
   public void stop() {
-    exitWorker = true;
+    coordination.stopProcessingTasks();
     stopTask();
   }
 
   /**
-   * Asynchronous handler for zk get-data-set-watch on node results.
-   */
-  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
-    private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
-
-    @Override
-    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-      SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
-      if (rc != 0) {
-        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
-        getDataSetWatchFailure(path);
-        return;
-      }
-      data = watcher.getRecoverableZooKeeper().removeMetaData(data);
-      getDataSetWatchSuccess(path, data);
-    }
-  }
-
-  /**
    * Objects implementing this interface actually do the task that has been
    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
    * guarantee that two workers will not be executing the same task therefore it
@@ -642,4 +198,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
     }
     Status exec(String name, RecoveryMode mode, CancelableProgressable p);
   }
+
+  /**
+   * Returns the number of tasks processed by coordination.
+   * This method is used by tests only
+   */
+  @VisibleForTesting
+  public int getTaskReadySeq() {
+    return coordination.getTaskReadySeq();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
index 9bfdeed..06d21d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -30,17 +28,13 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Handles log splitting a wal
@@ -49,28 +43,24 @@ import org.apache.zookeeper.KeeperException;
 public class HLogSplitterHandler extends EventHandler {
   private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class);
   private final ServerName serverName;
-  private final String curTask;
-  private final String wal;
-  private final ZooKeeperWatcher zkw;
   private final CancelableProgressable reporter;
   private final AtomicInteger inProgressTasks;
-  private final MutableInt curTaskZKVersion;
   private final TaskExecutor splitTaskExecutor;
   private final RecoveryMode mode;
+  private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails;
+  private final SplitLogWorkerCoordination coordination;
 
-  public HLogSplitterHandler(final Server server, String curTask,
-      final MutableInt curTaskZKVersion,
-      CancelableProgressable reporter,
+
+  public HLogSplitterHandler(final Server server, SplitLogWorkerCoordination coordination,
+      SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter,
       AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, RecoveryMode mode) {
 	  super(server, EventType.RS_LOG_REPLAY);
-    this.curTask = curTask;
-    this.wal = ZKSplitLog.getFileName(curTask);
+    this.splitTaskDetails = splitDetails;
+    this.coordination = coordination;
     this.reporter = reporter;
     this.inProgressTasks = inProgressTasks;
     this.inProgressTasks.incrementAndGet();
     this.serverName = server.getServerName();
-    this.zkw = server.getZooKeeper();
-    this.curTaskZKVersion = curTaskZKVersion;
     this.splitTaskExecutor = splitTaskExecutor;
     this.mode = mode;
   }
@@ -79,20 +69,20 @@ public class HLogSplitterHandler extends EventHandler {
   public void process() throws IOException {
     long startTime = System.currentTimeMillis();
     try {
-      Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
+      Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
       switch (status) {
       case DONE:
-        endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
-          SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
+        coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
+          SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
         break;
       case PREEMPTED:
         SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
-        LOG.warn("task execution prempted " + wal);
+        LOG.warn("task execution prempted " + splitTaskDetails.getWALFile());
         break;
       case ERR:
         if (server != null && !server.isStopped()) {
-          endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
-            SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
+          coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
+            SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
           break;
         }
         // if the RS is exiting then there is probably a tons of stuff
@@ -100,45 +90,17 @@ public class HLogSplitterHandler extends EventHandler {
         //$FALL-THROUGH$
       case RESIGNED:
         if (server != null && server.isStopped()) {
-          LOG.info("task execution interrupted because worker is exiting " + curTask);
+          LOG.info("task execution interrupted because worker is exiting "
+              + splitTaskDetails.toString());
         }
-        endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
-          SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
+        coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
+          SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
         break;
       }
     } finally {
-      LOG.info("worker " + serverName + " done with task " + curTask + " in "
+      LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in "
           + (System.currentTimeMillis() - startTime) + "ms");
       this.inProgressTasks.decrementAndGet();
     }
   }
-  
-  /**
-   * endTask() can fail and the only way to recover out of it is for the
-   * {@link SplitLogManager} to timeout the task node.
-   * @param slt
-   * @param ctr
-   */
-  public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task,
-      int taskZKVersion) {
-    try {
-      if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) {
-        LOG.info("successfully transitioned task " + task + " to final state " + slt);
-        ctr.incrementAndGet();
-        return;
-      }
-      LOG.warn("failed to transistion task " + task + " to end state " + slt
-          + " because of version mismatch ");
-    } catch (KeeperException.BadVersionException bve) {
-      LOG.warn("transisition task " + task + " to " + slt
-          + " failed because of version mismatch", bve);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.fatal(
-        "logic error - end task " + task + " " + slt
-          + " failed because task doesn't exist", e);
-    } catch (KeeperException e) {
-      LOG.warn("failed to end task, " + task + " " + slt, e);
-    }
-    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 16b491f..9304ac8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -78,9 +78,10 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -110,7 +111,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.MultipleIOException;
 
 import com.google.common.base.Preconditions;
@@ -138,8 +138,7 @@ public class HLogSplitter {
 
   private Set<TableName> disablingOrDisabledTables =
       new HashSet<TableName>();
-  private ZooKeeperWatcher watcher;
-  private CoordinatedStateManager csm;
+  private BaseCoordinatedStateManager csm;
 
   // If an exception is thrown by one of the other threads, it will be
   // stored here.
@@ -173,7 +172,7 @@ public class HLogSplitter {
   private final int minBatchSize;
 
   HLogSplitter(Configuration conf, Path rootDir,
-      FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
+      FileSystem fs, LastSequenceId idChecker,
       CoordinatedStateManager csm, RecoveryMode mode) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
@@ -182,8 +181,7 @@ public class HLogSplitter {
     this.rootDir = rootDir;
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
-    this.watcher = zkw;
-    this.csm = csm;
+    this.csm = (BaseCoordinatedStateManager)csm;
 
     entryBuffers = new EntryBuffers(
         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
@@ -195,7 +193,7 @@ public class HLogSplitter {
     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
 
     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    if (zkw != null && csm != null && this.distributedLogReplay) {
+    if (csm != null && this.distributedLogReplay) {
       outputSink = new LogReplayOutputSink(numWriterThreads);
     } else {
       if (this.distributedLogReplay) {
@@ -219,15 +217,14 @@ public class HLogSplitter {
    * @param conf
    * @param reporter
    * @param idChecker
-   * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
-   *          dump out recoved.edits files for regions to replay on.
+   * @param cp coordination state manager
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
-      ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, mode);
+      CoordinatedStateManager cp, RecoveryMode mode) throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, cp, mode);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -241,8 +238,8 @@ public class HLogSplitter {
     List<Path> splits = new ArrayList<Path>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
-          RecoveryMode.LOG_SPLITTING);
+        HLogSplitter s =
+            new HLogSplitter(conf, rootDir, fs, null, null, RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
           if (s.outputSink.splits != null) {
@@ -295,7 +292,7 @@ public class HLogSplitter {
         LOG.warn("Nothing to split in log file " + logPath);
         return true;
       }
-      if(watcher != null && csm != null) {
+      if(csm != null) {
         try {
           TableStateManager tsm = csm.getTableStateManager();
           disablingOrDisabledTables = tsm.getTablesInStates(
@@ -320,7 +317,8 @@ public class HLogSplitter {
         if (lastFlushedSequenceId == null) {
           if (this.distributedLogReplay) {
             RegionStoreSequenceIds ids =
-                SplitLogManager.getRegionFlushedSequenceId(this.watcher, failedServerName, key);
+                csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
+                  key);
             if (ids != null) {
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
             }
@@ -358,7 +356,8 @@ public class HLogSplitter {
       throw iie;
     } catch (CorruptedLogFileException e) {
       LOG.warn("Could not parse, corrupted log file " + logPath, e);
-      ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+      csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
+        logfile.getPath().getName(), fs);
       isCorrupted = true;
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
@@ -1368,8 +1367,9 @@ public class HLogSplitter {
 
     public LogReplayOutputSink(int numWriters) {
       super(numWriters);
-      this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout",
-        SplitLogManager.DEFAULT_TIMEOUT);
+      this.waitRegionOnlineTimeOut =
+          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
+            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
       this.logRecoveredEditsOutputSink.setReporter(reporter);
     }
@@ -1590,8 +1590,8 @@ public class HLogSplitter {
         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
         // update the value for the region
         RegionStoreSequenceIds ids =
-            SplitLogManager.getRegionFlushedSequenceId(watcher, failedServerName, loc
-                .getRegionInfo().getEncodedName());
+            csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
+              loc.getRegionInfo().getEncodedName());
         if (ids != null) {
           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index 943b944..ac6042f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,8 +30,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Common methods and attributes used by {@link SplitLogManager} and {@link SplitLogWorker}
@@ -120,4 +125,100 @@ public class ZKSplitLog {
     return isCorrupt;
   }
 
+  /*
+   * Following methods come from SplitLogManager
+   */
+
+  /**
+   * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
+   * and set watcher as well.
+   * @param zkw
+   * @param regionEncodedName region encode name
+   * @return true when /hbase/recovering-regions/<current region encoded name> exists
+   * @throws KeeperException
+   */
+  public static boolean
+      isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
+          throws KeeperException {
+    boolean result = false;
+    String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
+
+    byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
+    if (node != null) {
+      result = true;
+    }
+    return result;
+  }
+
+  /**
+   * @param bytes - Content of a failed region server or recovering region znode.
+   * @return long - The last flushed sequence Id for the region server
+   */
+  public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
+    long lastRecordedFlushedSequenceId = -1l;
+    try {
+      lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
+    } catch (DeserializationException e) {
+      lastRecordedFlushedSequenceId = -1l;
+      LOG.warn("Can't parse last flushed sequence Id", e);
+    }
+    return lastRecordedFlushedSequenceId;
+  }
+
+  public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
+    try {
+      if (regions == null) {
+        // remove all children under /home/recovering-regions
+        LOG.debug("Garbage collecting all recovering region znodes");
+        ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
+      } else {
+        for (String curRegion : regions) {
+          String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
+          ZKUtil.deleteNodeRecursively(watcher, nodePath);
+        }
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
+    }
+  }
+
+  /**
+   * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
+   * @param zkw
+   * @param serverName
+   * @param encodedRegionName
+   * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
+   * @throws IOException
+   */
+
+  public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
+      String serverName, String encodedRegionName) throws IOException {
+    // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
+    // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
+    // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
+    // sequence Id name space (sequence Id only valid for a particular RS instance), changes
+    // when different newly assigned RS flushes the region.
+    // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
+    // last flushed sequence Id for each failed RS instance.
+    RegionStoreSequenceIds result = null;
+    String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
+    nodePath = ZKUtil.joinZNode(nodePath, serverName);
+    try {
+      byte[] data;
+      try {
+        data = ZKUtil.getData(zkw, nodePath);
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException();
+      }
+      if (data != null) {
+        result = ZKUtil.parseRegionStoreSequenceIds(data);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
+          + serverName + "; region=" + encodedRegionName, e);
+    } catch (DeserializationException e) {
+      LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 1735382..f5dbaf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -77,6 +77,9 @@ import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
@@ -657,8 +660,8 @@ public class TestDistributedLogSplitting {
       break;
     }
 
-    slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
-    slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
+    slm.markRegionsRecovering(firstFailedServer, regionSet);
+    slm.markRegionsRecovering(secondFailedServer, regionSet);
 
     List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
       ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
@@ -886,7 +889,7 @@ public class TestDistributedLogSplitting {
       break;
     }
 
-    slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
+    slm.markRegionsRecovering(hrs.getServerName(), regionSet);
     // move region in order for the region opened in recovering state
     final HRegionInfo hri = region;
     final HRegionServer tmpRS = dstRS;
@@ -1070,7 +1073,10 @@ public class TestDistributedLogSplitting {
       out.write(0);
       out.write(Bytes.toBytes("corrupted bytes"));
       out.close();
-      slm.ignoreZKDeleteForTesting = true;
+      ZKSplitLogManagerCoordination coordination =
+          (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
+              .getCoordinatedStateManager()).getSplitLogManagerCoordination();
+      coordination.setIgnoreDeleteForTesting(true);
       executor = Executors.newSingleThreadExecutor();
       Runnable runnable = new Runnable() {
        @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index c88d2c7..25de362 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -19,11 +19,8 @@
 package org.apache.hadoop.hbase.master;
 
 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
@@ -49,22 +46,26 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -85,13 +86,14 @@ public class TestSplitLogManager {
   private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
   private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
   private final ServerManager sm = Mockito.mock(ServerManager.class);
-  private final MasterServices master =  Mockito.mock(MasterServices.class);
+  private final MasterServices master = Mockito.mock(MasterServices.class);
 
   static {
     Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
   }
 
   private ZooKeeperWatcher zkw;
+  private DummyServer ds;
   private static boolean stopped = false;
   private SplitLogManager slm;
   private Configuration conf;
@@ -100,6 +102,68 @@ public class TestSplitLogManager {
 
   private static HBaseTestingUtility TEST_UTIL;
 
+  class DummyServer implements Server {
+    private ZooKeeperWatcher zkw;
+    private Configuration conf;
+    private CoordinatedStateManager cm;
+
+    public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
+      this.zkw = zkw;
+      this.conf = conf;
+      cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
+      cm.initialize(this);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return null;
+    }
+
+    @Override
+    public CoordinatedStateManager getCoordinatedStateManager() {
+      return cm;
+    }
+
+    @Override
+    public HConnection getShortCircuitConnection() {
+      return null;
+    }
+
+    @Override
+    public MetaTableLocator getMetaTableLocator() {
+      return null;
+    }
+
+  }
+
   static Stoppable stopper = new Stoppable() {
     @Override
     public void stop(String why) {
@@ -110,7 +174,6 @@ public class TestSplitLogManager {
     public boolean isStopped() {
       return stopped;
     }
-
   };
 
   @Before
@@ -119,7 +182,10 @@ public class TestSplitLogManager {
     TEST_UTIL.startMiniZKCluster();
     conf = TEST_UTIL.getConfiguration();
     // Use a different ZK wrapper instance for each tests.
-    zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
+    zkw =
+        new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
+    ds = new DummyServer(zkw, conf);
+
     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
@@ -132,18 +198,20 @@ public class TestSplitLogManager {
     resetCounters();
 
     // By default, we let the test manage the error as before, so the server
-    //  does not appear as dead from the master point of view, only from the split log pov.
+    // does not appear as dead from the master point of view, only from the split log pov.
     Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
     Mockito.when(master.getServerManager()).thenReturn(sm);
 
     to = 6000;
-    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
+
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     to = to + 4 * 100;
-    
-    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
-        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+
+    this.mode =
+        (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
+            : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -173,17 +241,17 @@ public class TestSplitLogManager {
       throws Exception {
 
     TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-            return (e.eval() != oldval);
-        }
+      @Override
+      public boolean evaluate() throws Exception {
+        return (e.eval() != oldval);
+      }
     });
 
     assertEquals(newval, e.eval());
   }
 
-  private String submitTaskAndWait(TaskBatch batch, String name)
-  throws KeeperException, InterruptedException {
+  private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
+      InterruptedException {
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
     NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
     zkw.registerListener(listener);
@@ -208,7 +276,7 @@ public class TestSplitLogManager {
   public void testTaskCreation() throws Exception {
 
     LOG.info("TestTaskCreation - test the creation of a task in zk");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -228,7 +296,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -254,7 +322,7 @@ public class TestSplitLogManager {
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -275,9 +343,8 @@ public class TestSplitLogManager {
   @Test
   public void testMultipleResubmits() throws Exception {
     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
-
     conf.setInt("hbase.splitlog.max.resubmit", 2);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -309,7 +376,7 @@ public class TestSplitLogManager {
   public void testRescanCleanup() throws Exception {
     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -338,7 +405,7 @@ public class TestSplitLogManager {
   public void testTaskDone() throws Exception {
     LOG.info("TestTaskDone - cleanup task node once in DONE state");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
@@ -358,7 +425,7 @@ public class TestSplitLogManager {
     LOG.info("TestTaskErr - cleanup task node once in ERR state");
 
     conf.setInt("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -373,14 +440,14 @@ public class TestSplitLogManager {
     }
     waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
     assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
-    conf.setInt("hbase.splitlog.max.resubmit", SplitLogManager.DEFAULT_MAX_RESUBMIT);
+    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
   }
 
   @Test
   public void testTaskResigned() throws Exception {
     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     assertEquals(tot_mgr_resubmit.get(), 0);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -414,7 +481,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
 
     // submit another task which will stay in unassigned mode
@@ -443,7 +510,7 @@ public class TestSplitLogManager {
     LOG.info("testDeadWorker");
 
     conf.setLong("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -468,7 +535,7 @@ public class TestSplitLogManager {
 
   @Test
   public void testWorkerCrash() throws Exception {
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -493,7 +560,7 @@ public class TestSplitLogManager {
   @Test
   public void testEmptyLogDir() throws Exception {
     LOG.info("testEmptyLogDir");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
         UUID.randomUUID().toString());
@@ -505,7 +572,7 @@ public class TestSplitLogManager {
   @Test (timeout = 60000)
   public void testLogFilesAreArchived() throws Exception {
     LOG.info("testLogFilesAreArchived");
-    final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
+    final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
     conf.set(HConstants.HBASE_DIR, dir.toString());
@@ -554,15 +621,15 @@ public class TestSplitLogManager {
           HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
     ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
-    slm.removeStaleRecoveringRegionsFromZK(null);
+    slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
+    slm.removeStaleRecoveringRegions(null);
 
     List<String> recoveringRegions =
         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
 
     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
   }
-  
+
   @Test(timeout=60000)
   public void testGetPreviousRecoveryMode() throws Exception {
     LOG.info("testGetPreviousRecoveryMode");
@@ -575,12 +642,12 @@ public class TestSplitLogManager {
         ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
-    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
-    
+    slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER);
+    assertTrue(slm.isLogSplitting());
+
     zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
     slm.setRecoveryMode(false);
-    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
+    assertTrue(slm.isLogReplaying());
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index dcb1e88..5caa544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -19,8 +19,9 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -30,19 +31,23 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -65,11 +70,74 @@ public class TestSplitLogWorker {
   }
   private final static HBaseTestingUtility TEST_UTIL =
     new HBaseTestingUtility();
+  private DummyServer ds;
   private ZooKeeperWatcher zkw;
   private SplitLogWorker slw;
   private ExecutorService executorService;
   private RecoveryMode mode;
 
+  class DummyServer implements Server {
+    private ZooKeeperWatcher zkw;
+    private Configuration conf;
+    private CoordinatedStateManager cm;
+
+    public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
+      this.zkw = zkw;
+      this.conf = conf;
+      cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
+      cm.initialize(this);
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      return zkw;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return null;
+    }
+
+    @Override
+    public CoordinatedStateManager getCoordinatedStateManager() {
+      return cm;
+    }
+
+    @Override
+    public HConnection getShortCircuitConnection() {
+      return null;
+    }
+
+    @Override
+    public MetaTableLocator getMetaTableLocator() {
+      return null;
+    }
+
+  }
+
   private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
       throws Exception {
     assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
@@ -106,19 +174,22 @@ public class TestSplitLogWorker {
     Configuration conf = TEST_UTIL.getConfiguration();
     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
         "split-log-worker-tests", null);
+    ds = new DummyServer(zkw, conf);
     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
-    assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
+    assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1)));
     LOG.debug(zkw.baseZNode + " created");
     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
-    assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
+    assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1)));
+
     LOG.debug(zkw.splitLogZNode + " created");
     ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
-    assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
+    assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1)));
+
     SplitLogCounters.resetCounters();
     executorService = new ExecutorService("TestSplitLogWorker");
     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
-    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
@@ -157,12 +228,12 @@ public class TestSplitLogWorker {
     final ServerName RS = ServerName.valueOf("rs,1,1");
     RegionServerServices mockedRS = getRegionServer(RS);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
-      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), 
+      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
         Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
     SplitLogWorker slw =
-        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
@@ -170,7 +241,7 @@ public class TestSplitLogWorker {
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(RS));
     } finally {
-      stopSplitLogWorker(slw);
+     stopSplitLogWorker(slw);
     }
   }
 
@@ -193,14 +264,14 @@ public class TestSplitLogWorker {
     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
-      new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+      new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     RegionServerServices mockedRS1 = getRegionServer(SVR1);
     RegionServerServices mockedRS2 = getRegionServer(SVR2);
     SplitLogWorker slw1 =
-        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
+        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
     SplitLogWorker slw2 =
-        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
+        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
     slw1.start();
     slw2.start();
     try {
@@ -227,7 +298,7 @@ public class TestSplitLogWorker {
     final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
     RegionServerServices mockedRS = getRegionServer(SRV);
     SplitLogWorker slw =
-        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       Thread.yield(); // let the worker start
@@ -236,11 +307,11 @@ public class TestSplitLogWorker {
 
       // this time create a task node after starting the splitLogWorker
       zkw.getRecoverableZooKeeper().create(PATH,
-        new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+        new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
-      assertEquals(1, slw.taskReadySeq);
+      assertEquals(1, slw.getTaskReadySeq());
       byte [] bytes = ZKUtil.getData(zkw, PATH);
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(SRV));
@@ -260,14 +331,14 @@ public class TestSplitLogWorker {
     final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
     RegionServerServices mockedRS = getRegionServer(SRV);
     SplitLogWorker slw =
-        new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+        new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     try {
       Thread.yield(); // let the worker start
       Thread.sleep(100);
       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
 
-      SplitLogTask unassignedManager = 
+      SplitLogTask unassignedManager =
         new SplitLogTask.Unassigned(MANAGER, this.mode);
       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -287,7 +358,7 @@ public class TestSplitLogWorker {
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
 
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
-      assertEquals(2, slw.taskReadySeq);
+      assertEquals(2, slw.getTaskReadySeq());
       byte [] bytes = ZKUtil.getData(zkw, PATH2);
       slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(SRV));
@@ -302,7 +373,7 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     final ServerName SRV = ServerName.valueOf("svr,1,1");
     RegionServerServices mockedRS = getRegionServer(SRV);
-    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+    slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
     slw.start();
     Thread.yield(); // let the worker start
     Thread.sleep(100);
@@ -358,14 +429,13 @@ public class TestSplitLogWorker {
     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
     RegionServerServices mockedRS = getRegionServer(RS);
-
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
-    SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
     slw.start();
     try {
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
@@ -408,7 +478,7 @@ public class TestSplitLogWorker {
           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
-    SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
+    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
     slw.start();
     try {
       int acquiredTasks = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
index cdf71f6..74badb5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -18,7 +18,12 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.NavigableSet;
@@ -27,7 +32,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
@@ -35,8 +44,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.mockito.Mockito.mock;
-
 /**
  * Simple testing of a few HLog methods.
  */
@@ -45,7 +52,7 @@ public class TestHLogMethods {
   private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
   private static final TableName TEST_TABLE =
       TableName.valueOf("test_table");
-  
+
   private final HBaseTestingUtility util = new HBaseTestingUtility();
 
   /**
@@ -108,27 +115,27 @@ public class TestHLogMethods {
     reb.appendEntry(createTestLogEntry(1));
     assertTrue(reb.heapSize() > 0);
   }
-  
+
   @Test
   public void testEntrySink() throws Exception {
     Configuration conf = new Configuration();
-    RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+    RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     HLogSplitter splitter = new HLogSplitter(
-      conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode);
+      conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
 
     EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
     for (int i = 0; i < 1000; i++) {
       HLog.Entry entry = createTestLogEntry(i);
       sink.appendEntry(entry);
     }
-    
+
     assertTrue(sink.totalBuffered > 0);
     long amountInChunk = sink.totalBuffered;
     // Get a chunk
     RegionEntryBuffer chunk = sink.getChunkToWrite();
     assertEquals(chunk.heapSize(), amountInChunk);
-    
+
     // Make sure it got marked that a thread is "working on this"
     assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
 
@@ -136,26 +143,26 @@ public class TestHLogMethods {
     for (int i = 0; i < 500; i++) {
       HLog.Entry entry = createTestLogEntry(i);
       sink.appendEntry(entry);
-    }    
+    }
     // Asking for another chunk shouldn't work since the first one
     // is still writing
     assertNull(sink.getChunkToWrite());
-    
+
     // If we say we're done writing the first chunk, then we should be able
     // to get the second
     sink.doneWriting(chunk);
-    
+
     RegionEntryBuffer chunk2 = sink.getChunkToWrite();
     assertNotNull(chunk2);
     assertNotSame(chunk, chunk2);
     long amountInChunk2 = sink.totalBuffered;
     // The second chunk had fewer rows than the first
     assertTrue(amountInChunk2 < amountInChunk);
-    
+
     sink.doneWriting(chunk2);
     assertEquals(0, sink.totalBuffered);
   }
-  
+
   private HLog.Entry createTestLogEntry(int i) {
     long seq = i;
     long now = i * 1000;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
index a133dea..1c70fb5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java
@@ -138,7 +138,7 @@ public class TestHLogReaderOnSecureHLog {
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     Path rootdir = FSUtils.getRootDir(conf);
     try {
-      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
+      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
       s.splitLogFile(listStatus[0], null);
       Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
         "corrupt");
@@ -181,7 +181,7 @@ public class TestHLogReaderOnSecureHLog {
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     Path rootdir = FSUtils.getRootDir(conf);
     try {
-      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, null, mode);
+      HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode);
       s.splitLogFile(listStatus[0], null);
       Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
         "corrupt");

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index dc39415..cffc9cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -809,7 +809,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null, null, this.mode) {
+        conf, HBASEDIR, fs, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs,
           Path logfile, Configuration conf) throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@@ -942,7 +942,7 @@ public class TestHLogSplit {
     try {
       conf.setInt("hbase.splitlog.report.period", 1000);
       boolean ret = HLogSplitter.splitLogFile(
-        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, this.mode);
+        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode);
       assertFalse("Log splitting should failed", ret);
       assertTrue(count.get() > 0);
     } catch (IOException e) {
@@ -1001,7 +1001,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, HBASEDIR, fs, null, null, null, this.mode) {
+        localConf, HBASEDIR, fs, null, null, this.mode) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1286,7 +1286,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null, null, this.mode) {
+        conf, HBASEDIR, fs, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66220e49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 71ee595..653dab6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -885,7 +885,7 @@ public class TestWALReplay {
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
-      this.fs, this.conf, null, null, null, null, mode);
+      this.fs, this.conf, null, null, null, mode);
     FileStatus[] listStatus1 = this.fs.listStatus(
         new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
             new Path(hri.getEncodedName(), "recovered.edits")));


Mime
View raw message