hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [45/50] [abbrv] hbase git commit: HBASE-14030 HBase Backup/Restore Phase 1 (v42)
Date Tue, 22 Mar 2016 02:42:14 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
new file mode 100644
index 0000000..14235ce
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupContext;
+import org.apache.hadoop.hbase.backup.impl.BackupCopyService;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+/**
+ * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
+ * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
+ * implementation. The other is copying for incremental log files, which bases on extending
+ * DistCp's function with copy progress reporting to ZooKeeper implementation.
+ *
+ * For now this is only a wrapper. The other features such as progress and increment backup will be
+ * implemented in future jira
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceBackupCopyService implements BackupCopyService {
+  private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class);
+
+  private Configuration conf;
+  // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
+
+  // Accumulated progress within the whole backup process for the copy operation
+  private float progressDone = 0.1f;
+  private long bytesCopied = 0;
+  private static float INIT_PROGRESS = 0.1f;
+
+  // The percentage of the current copy task within the whole task if multiple time copies are
+  // needed. The default value is 100%, which means only 1 copy task for the whole.
+  private float subTaskPercntgInWholeTask = 1f;
+
+  public MapReduceBackupCopyService() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the current copy task percentage within the whole task if multiple copies are needed.
+   * @return the current copy task percentage
+   */
+  public float getSubTaskPercntgInWholeTask() {
+    return subTaskPercntgInWholeTask;
+  }
+
+  /**
+   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+   * be called before calling
+   * {@link #copy(BackupHandler, Configuration, Type, String[])}
+   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+   */
+  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+  }
+
+  class SnapshotCopy extends ExportSnapshot {
+    private BackupContext backupContext;
+    private TableName table;
+
+    public SnapshotCopy(BackupContext backupContext, TableName table) {
+      super();
+      this.backupContext = backupContext;
+      this.table = table;
+    }
+
+    public TableName getTable() {
+      return this.table;
+    }
+  }
+
+  // Extends DistCp for progress updating to hbase:backup
+  // during backup. Using DistCpV2 (MAPREDUCE-2765).
+  // Simply extend it and override execute() method to get the
+  // Job reference for progress updating.
+  // Only the argument "src1, [src2, [...]] dst" is supported,
+  // no more DistCp options.
+  class BackupDistCp extends DistCp {
+
+    private BackupContext backupContext;
+    private BackupManager backupManager;
+
+    public BackupDistCp(Configuration conf, DistCpOptions options, BackupContext backupContext,
+        BackupManager backupManager)
+        throws Exception {
+      super(conf, options);
+      this.backupContext = backupContext;
+      this.backupManager = backupManager;
+    }
+
+    @Override
+    public Job execute() throws Exception {
+
+      // reflection preparation for private methods and fields
+      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+      Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+      Method methodCreateInputFileListing =
+          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+      methodCreateMetaFolderPath.setAccessible(true);
+      methodCreateJob.setAccessible(true);
+      methodCreateInputFileListing.setAccessible(true);
+      methodCleanup.setAccessible(true);
+
+      fieldInputOptions.setAccessible(true);
+      fieldMetaFolder.setAccessible(true);
+      fieldJobFS.setAccessible(true);
+      fieldSubmitted.setAccessible(true);
+
+      // execute() logic starts here
+      assert fieldInputOptions.get(this) != null;
+      assert getConf() != null;
+
+      Job job = null;
+      try {
+        synchronized (this) {
+          // Don't cleanup while we are setting up.
+          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+          fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf()));
+
+          job = (Job) methodCreateJob.invoke(this);
+        }
+        methodCreateInputFileListing.invoke(this, job);
+
+        // Get the total length of the source files
+        List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+        long totalSrcLgth = 0;
+        for (Path aSrc : srcs) {
+          totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc);
+        }
+
+        // submit the copy job
+        job.submit();
+        fieldSubmitted.set(this, true);
+
+        // after submit the MR job, set its handler in backup handler for cancel process
+        // this.backupHandler.copyJob = job;
+
+        // Update the copy progress to ZK every 0.5s if progress value changed
+        int progressReportFreq =
+            this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
+        float lastProgress = progressDone;
+        while (!job.isComplete()) {
+          float newProgress =
+              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+          if (newProgress > lastProgress) {
+
+            BigDecimal progressData =
+                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+            String newProgressStr = progressData + "%";
+            LOG.info("Progress: " + newProgressStr);
+            BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+              bytesCopied);
+            LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+              + ".\"");
+            lastProgress = newProgress;
+          }
+          Thread.sleep(progressReportFreq);
+        }
+
+        // update the progress data after copy job complete
+        float newProgress =
+            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+        BigDecimal progressData =
+            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+        String newProgressStr = progressData + "%";
+        LOG.info("Progress: " + newProgressStr);
+
+        // accumulate the overall backup progress
+        progressDone = newProgress;
+        bytesCopied += totalSrcLgth;
+
+        BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+          bytesCopied);
+        LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+          + " - " + bytesCopied + " bytes copied.\"");
+
+      } finally {
+        if (!fieldSubmitted.getBoolean(this)) {
+          methodCleanup.invoke(this);
+        }
+      }
+
+      String jobID = job.getJobID().toString();
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+      LOG.debug("DistCp job-id: " + jobID);
+      return job;
+    }
+
+  }
+
+  /**
+   * Do backup copy based on different types.
+   * @param context The backup context
+   * @param conf The hadoop configuration
+   * @param copyType The backup copy type
+   * @param options Options for customized ExportSnapshot or DistCp
+   * @throws Exception exception
+   */
+  @Override
+  public int copy(BackupContext context, BackupManager backupManager, Configuration conf,
+      BackupCopyService.Type copyType, String[] options) throws IOException {
+    int res = 0;
+
+    try {
+      if (copyType == Type.FULL) {
+        SnapshotCopy snapshotCp =
+            new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+        LOG.debug("Doing SNAPSHOT_COPY");
+        // Make a new instance of conf to be used by the snapshot copy class.
+        snapshotCp.setConf(new Configuration(conf));
+        res = snapshotCp.run(options);
+      } else if (copyType == Type.INCREMENTAL) {
+        LOG.debug("Doing COPY_TYPE_DISTCP");
+        setSubTaskPercntgInWholeTask(1f);
+
+        BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context,
+          backupManager);
+        // Handle a special case where the source file is a single file.
+        // In this case, distcp will not create the target dir. It just take the
+        // target as a file name and copy source file to the target (as a file name).
+        // We need to create the target dir before run distcp.
+        LOG.debug("DistCp options: " + Arrays.toString(options));
+        if (options.length == 2) {
+          Path dest = new Path(options[1]);
+          FileSystem destfs = dest.getFileSystem(conf);
+          if (!destfs.exists(dest)) {
+            destfs.mkdirs(dest);
+          }
+        }
+
+        res = distcp.run(options);
+      }
+      return res;
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
new file mode 100644
index 0000000..203c9a3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil;
+import org.apache.hadoop.hbase.backup.impl.IncrementalRestoreService;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceRestoreService implements IncrementalRestoreService {
+  public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class);
+
+  private WALPlayer player;
+
+  public MapReduceRestoreService() {
+    this.player = new WALPlayer();
+  }
+
+  @Override
+  public void run(String logDir, TableName[] tableNames, TableName[] newTableNames) throws IOException {
+    String tableStr = BackupUtil.join(tableNames);
+    String newTableStr = BackupUtil.join(newTableNames);
+
+    // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each
+    // log file
+
+    String[] playerArgs = { logDir, tableStr, newTableStr };
+    LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables "
+        + BackupUtil.join(tableNames) + " to tables "
+        + BackupUtil.join(newTableNames));
+    try {
+      player.run(playerArgs);
+    } catch (Exception e) {
+      throw new IOException("cannot restore from backup directory " + logDir
+        + " (check Hadoop and HBase logs) " + e);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return player.getConf();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.player.setConf(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
new file mode 100644
index 0000000..dae24a6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -0,0 +1,119 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+
+/**
+ * Implementation of a log cleaner that checks if a log is still scheduled for
+ * incremental backup before deleting it when its TTL is over.
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class BackupLogCleaner extends BaseLogCleanerDelegate {
+  private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class);
+
+  private boolean stopped = false;
+
+  public BackupLogCleaner() {
+  }
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    // all members of this class are null if backup is disabled,
+    // so we cannot filter the files
+    if (this.getConf() == null) {
+      return files;
+    }
+    
+    List<FileStatus> list = new ArrayList<FileStatus>();
+    // TODO: LogCleaners do not have a way to get the Connection from Master. We should find a
+    // way to pass it down here, so that this connection is not re-created every time.
+    // It is expensive
+    try(Connection connection = ConnectionFactory.createConnection(this.getConf());
+        final BackupSystemTable table = new BackupSystemTable(connection)) {
+
+      // If we do not have recorded backup sessions
+      if (!table.hasBackupSessions()) {
+        return files;
+      }
+      
+      for(FileStatus file: files){
+        String wal = file.getPath().toString();
+        boolean logInSystemTable = table.checkWALFile(wal);
+        if(LOG.isDebugEnabled()) {
+          if(logInSystemTable) {
+            LOG.debug("Found log file in hbase:backup, deleting: " + wal);
+            list.add(file);
+          } else {
+            LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal);
+          }
+        }
+      }
+      return list;  
+    } catch (IOException e) {
+      LOG.error("Failed to get hbase:backup table, therefore will keep all files", e);
+      // nothing to delete
+      return new ArrayList<FileStatus>();
+    }
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    // If backup is disabled, keep all members null
+    if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
+      LOG.warn("Backup is disabled - allowing all wals to be deleted");
+      return;
+    }
+    super.setConf(config);
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    this.stopped = true;
+    LOG.info("Stopping BackupLogCleaner");
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
new file mode 100644
index 0000000..f96682f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.MetricsMaster;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure.Procedure;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.zookeeper.KeeperException;
+
+public class LogRollMasterProcedureManager extends MasterProcedureManager {
+
+  public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
+  public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
+  private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
+
+  private MasterServices master;
+  private ProcedureCoordinator coordinator;
+  private boolean done;
+
+  @Override
+  public void stop(String why) {
+    LOG.info("stop: " + why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return false;
+  }
+
+  @Override
+  public void initialize(MasterServices master, MetricsMaster metricsMaster)
+      throws KeeperException, IOException, UnsupportedOperationException {
+    this.master = master;
+    this.done = false;
+
+    // setup the default procedure coordinator
+    String name = master.getServerName().toString();
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
+        .getCoordinatedStateManager(master.getConfiguration());
+    coordManager.initialize(master);
+
+    ProcedureCoordinatorRpcs comms =
+        coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool);
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return ROLLLOG_PROCEDURE_SIGNATURE;
+  }
+
+  @Override
+  public void execProcedure(ProcedureDescription desc) throws IOException {
+    this.done = false;
+    // start the process on the RS
+    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
+    List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
+    List<String> servers = new ArrayList<String>();
+    for (ServerName sn : serverNames) {
+      servers.add(sn.toString());
+    }
+    Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
+    if (proc == null) {
+      String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    try {
+      // wait for the procedure to complete. A timer thread is kicked off that should cancel this
+      // if it takes too long.
+      proc.waitForCompleted();
+      LOG.info("Done waiting - exec procedure for " + desc.getInstance());
+      LOG.info("Distributed roll log procedure is successful!");
+      this.done = true;
+    } catch (InterruptedException e) {
+      ForeignException ee =
+          new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+      Thread.currentThread().interrupt();
+    } catch (ForeignException e) {
+      ForeignException ee =
+          new ForeignException("Exception while waiting for roll log procdure to finish", e);
+      monitor.receive(ee);
+    }
+    monitor.rethrowException();
+  }
+
+  @Override
+  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
+    return done;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
new file mode 100644
index 0000000..d524140
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+
+/**
+ * This backup subprocedure implementation forces a log roll on the RS.
+ */
+public class LogRollBackupSubprocedure extends Subprocedure {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
+
+  private final RegionServerServices rss;
+  private final LogRollBackupSubprocedurePool taskManager;
+  private FSHLog hlog;
+
+  public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
+      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
+      LogRollBackupSubprocedurePool taskManager) {
+
+    super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
+      wakeFrequency, timeout);
+    LOG.info("Constructing a LogRollBackupSubprocedure.");
+    this.rss = rss;
+    this.taskManager = taskManager;
+  }
+
+  /**
+   * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
+   * with no use of subprocedurepool.
+   */
+  class RSRollLogTask implements Callable<Void> {
+    RSRollLogTask() {
+    }
+
+    @Override
+    public Void call() throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("++ DRPC started: " + rss.getServerName());
+      }
+      hlog = (FSHLog) rss.getWAL(null);
+      long filenum = hlog.getFilenum();
+
+      LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
+      hlog.rollWriter(true);
+      LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
+
+      Connection connection = rss.getConnection();
+      try(final BackupSystemTable table = new BackupSystemTable(connection)) {
+        // sanity check, good for testing
+        HashMap<String, Long> serverTimestampMap = table.readRegionServerLastLogRollResult();
+        String host = rss.getServerName().getHostname();
+        int port = rss.getServerName().getPort();
+        String server = host + ":" + port;
+        Long sts = serverTimestampMap.get(host);
+        if (sts != null && sts > filenum) {
+          LOG.warn("Won't update server's last roll log result: current="
+              + sts + " new=" + filenum);
+          return null;
+        }
+        // write the log number to hbase:backup.
+        table.writeRegionServerLastLogRollResult(server, filenum);
+        return null;
+      } catch (Exception e) {
+        LOG.error(e);
+        throw e; // TODO: is this correct?
+      }
+    }
+  }
+
+  private void rolllog() throws ForeignException {
+    monitor.rethrowException();
+
+    taskManager.submitTask(new RSRollLogTask());
+    monitor.rethrowException();
+
+    // wait for everything to complete.
+    taskManager.waitForOutstandingTasks();
+    monitor.rethrowException();
+
+  }
+
+  @Override
+  public void acquireBarrier() throws ForeignException {
+    // do nothing, executing in inside barrier step.
+  }
+
+  /**
+   * do a log roll.
+   * @return some bytes
+   */
+  @Override
+  public byte[] insideBarrier() throws ForeignException {
+    rolllog();
+    // FIXME
+    return null;
+  }
+
+  /**
+   * Cancel threads if they haven't finished.
+   */
+  @Override
+  public void cleanup(Exception e) {
+    taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+  }
+
+  /**
+   * Hooray!
+   */
+  public void releaseBarrier() {
+    // NO OP
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
new file mode 100644
index 0000000..1ca638c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.errorhandling.ForeignException;
+
+/**
+ * Handle running each of the individual tasks for completing a backup procedure
+ * on a regionserver.
+ */
+public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
+  private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class);
+
+  /** Maximum number of concurrent snapshot region tasks that can run concurrently */
+  private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks";
+  private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3;
+
+  private final ExecutorCompletionService<Void> taskPool;
+  private final ThreadPoolExecutor executor;
+  private volatile boolean aborted;
+  private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
+  private final String name;
+
+  public LogRollBackupSubprocedurePool(String name, Configuration conf) {
+    // configure the executor service
+    long keepAlive =
+        conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY,
+          LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
+    this.name = name;
+    executor =
+        new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name
+            + ")-backup-pool"));
+    taskPool = new ExecutorCompletionService<Void>(executor);
+  }
+
+  /**
+   * Submit a task to the pool.
+   */
+  public void submitTask(final Callable<Void> task) {
+    Future<Void> f = this.taskPool.submit(task);
+    futures.add(f);
+  }
+
+  /**
+   * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
+   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+   * @throws ForeignException exception
+   */
+  public boolean waitForOutstandingTasks() throws ForeignException {
+    LOG.debug("Waiting for backup procedure to finish.");
+
+    try {
+      for (Future<Void> f : futures) {
+        f.get();
+      }
+      return true;
+    } catch (InterruptedException e) {
+      if (aborted) {
+        throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!",
+            e);
+      }
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof ForeignException) {
+        throw (ForeignException) e.getCause();
+      }
+      throw new ForeignException(name, e.getCause());
+    } finally {
+      // close off remaining tasks
+      for (Future<Void> f : futures) {
+        if (!f.isDone()) {
+          f.cancel(true);
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
+   * finish
+   */
+  @Override
+  public void close() {
+    executor.shutdown();
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    if (this.aborted) {
+      return;
+    }
+
+    this.aborted = true;
+    LOG.warn("Aborting because: " + why, e);
+    this.executor.shutdownNow();
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
new file mode 100644
index 0000000..aca190c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * This manager class handles the work dealing with backup for a {@link HRegionServer}.
+ * <p>
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the subprocedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ * <p>
+ * On startup, requires {@link #start()} to be called.
+ * <p>
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be
+ * called
+ */
+public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
+
+  private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
+
+  /** Conf key for number of request threads to start backup on regionservers */
+  public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+  /** # of threads for backup work on the rs. */
+  public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+  public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+  public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+  /** Conf key for millis between checks to see if backup work completed or if there are errors */
+  public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+  /** Default amount of time to check for errors while regions finish backup work */
+  private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+  private RegionServerServices rss;
+  private ProcedureMemberRpcs memberRpcs;
+  private ProcedureMember member;
+
+  /**
+   * Create a default backup procedure manager
+   */
+  public LogRollRegionServerProcedureManager() {
+  }
+
+  /**
+   * Start accepting backup procedure requests.
+   */
+  @Override
+  public void start() {
+    this.memberRpcs.start(rss.getServerName().toString(), member);
+    LOG.info("Started region server backup manager.");
+  }
+
+  /**
+   * Close <tt>this</tt> and all running backup procedure tasks
+   * @param force forcefully stop all running tasks
+   * @throws IOException exception
+   */
+  @Override
+  public void stop(boolean force) throws IOException {
+    String mode = force ? "abruptly" : "gracefully";
+    LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+    try {
+      this.member.close();
+    } finally {
+      this.memberRpcs.close();
+    }
+  }
+
+  /**
+   * If in a running state, creates the specified subprocedure for handling a backup procedure.
+   * @return Subprocedure to submit to the ProcedureMemeber.
+   */
+  public Subprocedure buildSubprocedure() {
+
+    // don't run a backup if the parent is stop(ping)
+    if (rss.isStopping() || rss.isStopped()) {
+      throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+        + ", because stopping/stopped!");
+    }
+
+    LOG.info("Attempting to run a roll log procedure for backup.");
+    ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+    Configuration conf = rss.getConfiguration();
+    long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    long wakeMillis =
+        conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+    LogRollBackupSubprocedurePool taskManager =
+        new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
+    return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+      taskManager);
+
+  }
+
+  /**
+   * Build the actual backup procedure runner that will do all the 'hard' work
+   */
+  public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+    @Override
+    public Subprocedure buildSubprocedure(String name, byte[] data) {
+      return LogRollRegionServerProcedureManager.this.buildSubprocedure();
+    }
+  }
+
+  @Override
+  public void initialize(RegionServerServices rss) throws IOException {
+    this.rss = rss;
+    BaseCoordinatedStateManager coordManager =
+        (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
+          .getConfiguration());
+    coordManager.initialize(rss);
+    this.memberRpcs =
+        coordManager
+        .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+    // read in the backup handler configuration properties
+    Configuration conf = rss.getConfiguration();
+    long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+    int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+    // create the actual cohort member
+    ThreadPoolExecutor pool =
+        ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+    this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+  }
+
+  @Override
+  public String getProcedureSignature() {
+    return "backup-proc";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..3342743 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.Server;
 
@@ -51,8 +55,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
    * Method to retrieve coordination for split log worker
    */
   public abstract  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
+  
   /**
    * Method to retrieve coordination for split log manager
    */
   public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+   */
+  public abstract ProcedureCoordinatorRpcs
+    getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+  
+  /**
+   * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
+   */
+  public abstract ProcedureMemberRpcs
+    getProcedureMemberRpcs(String procType) throws IOException;
+    
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..7cf4aab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,9 +17,15 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
@@ -49,9 +55,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
   @Override
   public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
     return splitLogWorkerCoordination;
-    }
+  }
+
   @Override
   public SplitLogManagerCoordination getSplitLogManagerCoordination() {
     return splitLogManagerCoordination;
   }
+
+  @Override
+  public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
+      throws IOException {
+    return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
+  }
+
+  @Override
+  public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
+    return new ZKProcedureMemberRpcs(watcher, procType);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 9d9cee0..2ceeda5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -85,6 +85,9 @@ public class WALPlayer extends Configured implements Tool {
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
+  public WALPlayer(){
+  }
+
   protected WALPlayer(final Configuration c) {
     super(c);
   }
@@ -94,7 +97,7 @@ public class WALPlayer extends Configured implements Tool {
    * This one can be used together with {@link KeyValueSortReducer}
    */
   static class WALKeyValueMapper
-  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
     private byte[] table;
 
     @Override
@@ -106,7 +109,9 @@ public class WALPlayer extends Configured implements Tool {
         if (Bytes.equals(table, key.getTablename().getName())) {
           for (Cell cell : value.getCells()) {
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            if (WALEdit.isMetaEditFamily(kv)) continue;
+            if (WALEdit.isMetaEditFamily(kv)) {
+              continue;
+            }
             context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
           }
         }
@@ -132,7 +137,7 @@ public class WALPlayer extends Configured implements Tool {
    * a running HBase instance.
    */
   protected static class WALMapper
-  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
     private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>();
 
     @Override
@@ -149,7 +154,9 @@ public class WALPlayer extends Configured implements Tool {
           Cell lastCell = null;
           for (Cell cell : value.getCells()) {
             // filtering WAL meta entries
-            if (WALEdit.isMetaEditFamily(cell)) continue;
+            if (WALEdit.isMetaEditFamily(cell)) {
+              continue;
+            }
 
             // Allow a subclass filter out this cell.
             if (filter(context, cell)) {
@@ -160,8 +167,12 @@ public class WALPlayer extends Configured implements Tool {
               if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
                   || !CellUtil.matchingRow(lastCell, cell)) {
                 // row or type changed, write out aggregate KVs.
-                if (put != null) context.write(tableOut, put);
-                if (del != null) context.write(tableOut, del);
+                if (put != null) {
+                  context.write(tableOut, put);
+                }
+                if (del != null) {
+                  context.write(tableOut, del);
+                }
                 if (CellUtil.isDelete(cell)) {
                   del = new Delete(CellUtil.cloneRow(cell));
                 } else {
@@ -177,8 +188,12 @@ public class WALPlayer extends Configured implements Tool {
             lastCell = cell;
           }
           // write residual KVs
-          if (put != null) context.write(tableOut, put);
-          if (del != null) context.write(tableOut, del);
+          if (put != null) {
+            context.write(tableOut, put);
+          }
+          if (del != null) {
+            context.write(tableOut, del);
+          }
         }
       } catch (InterruptedException e) {
         e.printStackTrace();
@@ -186,7 +201,8 @@ public class WALPlayer extends Configured implements Tool {
     }
 
     /**
-     * @param cell
+     * Filter cell
+     * @param cell cell
      * @return Return true if we are to emit this cell.
      */
     protected boolean filter(Context context, final Cell cell) {
@@ -197,9 +213,7 @@ public class WALPlayer extends Configured implements Tool {
     public void setup(Context context) throws IOException {
       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
-      if (tablesToUse == null && tableMap == null) {
-        // Then user wants all tables.
-      } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+      if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
         // this can only happen when WALMapper is used directly by a class other than WALPlayer
         throw new IOException("No tables or incorrect table mapping specified.");
       }
@@ -215,7 +229,9 @@ public class WALPlayer extends Configured implements Tool {
 
   void setupTime(Configuration conf, String option) throws IOException {
     String val = conf.get(option);
-    if (null == val) return;
+    if (null == val) {
+      return;
+    }
     long ms;
     try {
       // first try to parse in user friendly form
@@ -295,7 +311,8 @@ public class WALPlayer extends Configured implements Tool {
     return job;
   }
 
-  /*
+  /**
+   * Print usage
    * @param errorMsg Error message.  Can be null.
    */
   private void usage(final String errorMsg) {
@@ -305,7 +322,8 @@ public class WALPlayer extends Configured implements Tool {
     System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
     System.err.println("Read all WAL entries for <tables>.");
     System.err.println("If no tables (\"\") are specific, all tables are imported.");
-    System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
+    System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported"+
+      " in that case.)");
     System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
     System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
     System.err.println("<tableMapping> is a command separated list of targettables.");
@@ -318,10 +336,10 @@ public class WALPlayer extends Configured implements Tool {
     System.err.println("  -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
     System.err.println("  -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
     System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the wal player");
+      + "=jobName - use the specified mapreduce job name for the wal player");
     System.err.println("For performance also consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n"
-        + "  -Dmapreduce.reduce.speculative=false");
+      + "  -Dmapreduce.map.speculative=false\n"
+      + "  -Dmapreduce.reduce.speculative=false");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b4bffb4..89cfd18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -390,6 +391,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
 
     Replication.decorateMasterConfiguration(this.conf);
+    BackupManager.decorateMasterConfiguration(this.conf);
 
     // Hack! Maps DFSClient => Master for logs.  HDFS made this
     // config param for task trackers, but we can piggyback off of it.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
index 95c3ffe..b6e11ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManager.java
@@ -37,7 +37,7 @@ public abstract class RegionServerProcedureManager extends ProcedureManager {
    * @param rss Region Server service interface
    * @throws KeeperException
    */
-  public abstract void initialize(RegionServerServices rss) throws KeeperException;
+  public abstract void initialize(RegionServerServices rss) throws IOException;
 
   /**
    * Start accepting procedure requests.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
index 0f4ea64..adb3604 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Provides the globally barriered procedure framework and environment
@@ -39,7 +38,7 @@ public class RegionServerProcedureManagerHost extends
   private static final Log LOG = LogFactory
       .getLog(RegionServerProcedureManagerHost.class);
 
-  public void initialize(RegionServerServices rss) throws KeeperException {
+  public void initialize(RegionServerServices rss) throws IOException {
     for (RegionServerProcedureManager proc : procedures) {
       LOG.debug("Procedure " + proc.getProcedureSignature() + " is initializing");
       proc.initialize(rss);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
index 085d642..3865ba9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
@@ -54,7 +54,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
    * @throws KeeperException if an unexpected zk error occurs
    */
   public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
-      String procedureClass, String coordName) throws KeeperException {
+      String procedureClass, String coordName) throws IOException {
     this.watcher = watcher;
     this.procedureType = procedureClass;
     this.coordName = coordName;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
index 2e03a60..9b491fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
@@ -68,49 +68,53 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
    * @throws KeeperException if we can't reach zookeeper
    */
   public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
-      throws KeeperException {
-    this.zkController = new ZKProcedureUtil(watcher, procType) {
-      @Override
-      public void nodeCreated(String path) {
-        if (!isInProcedurePath(path)) {
-          return;
-        }
+      throws IOException {
+    try {
+      this.zkController = new ZKProcedureUtil(watcher, procType) {
+        @Override
+        public void nodeCreated(String path) {
+          if (!isInProcedurePath(path)) {
+            return;
+          }
 
-        LOG.info("Received created event:" + path);
-        // if it is a simple start/end/abort then we just rewatch the node
-        if (isAcquiredNode(path)) {
-          waitForNewProcedures();
-          return;
-        } else if (isAbortNode(path)) {
-          watchForAbortedProcedures();
-          return;
+          LOG.info("Received created event:" + path);
+          // if it is a simple start/end/abort then we just rewatch the node
+          if (isAcquiredNode(path)) {
+            waitForNewProcedures();
+            return;
+          } else if (isAbortNode(path)) {
+            watchForAbortedProcedures();
+            return;
+          }
+          String parent = ZKUtil.getParent(path);
+          // if its the end barrier, the procedure can be completed
+          if (isReachedNode(parent)) {
+            receivedReachedGlobalBarrier(path);
+            return;
+          } else if (isAbortNode(parent)) {
+            abort(path);
+            return;
+          } else if (isAcquiredNode(parent)) {
+            startNewSubprocedure(path);
+          } else {
+            LOG.debug("Ignoring created notification for node:" + path);
+          }
         }
-        String parent = ZKUtil.getParent(path);
-        // if its the end barrier, the procedure can be completed
-        if (isReachedNode(parent)) {
-          receivedReachedGlobalBarrier(path);
-          return;
-        } else if (isAbortNode(parent)) {
-          abort(path);
-          return;
-        } else if (isAcquiredNode(parent)) {
-          startNewSubprocedure(path);
-        } else {
-          LOG.debug("Ignoring created notification for node:" + path);
-        }
-      }
 
-      @Override
-      public void nodeChildrenChanged(String path) {
-        if (path.equals(this.acquiredZnode)) {
-          LOG.info("Received procedure start children changed event: " + path);
-          waitForNewProcedures();
-        } else if (path.equals(this.abortZnode)) {
-          LOG.info("Received procedure abort children changed event: " + path);
-          watchForAbortedProcedures();
+        @Override
+        public void nodeChildrenChanged(String path) {
+          if (path.equals(this.acquiredZnode)) {
+            LOG.info("Received procedure start children changed event: " + path);
+            waitForNewProcedures();
+          } else if (path.equals(this.abortZnode)) {
+            LOG.info("Received procedure abort children changed event: " + path);
+            watchForAbortedProcedures();
+          }
         }
-      }
-    };
+      };
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
   }
 
   public ZKProcedureUtil getZkController() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 1aa959c..bd65cc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -317,7 +317,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
    * @throws KeeperException if the zookeeper cannot be reached
    */
   @Override
-  public void initialize(RegionServerServices rss) throws KeeperException {
+  public void initialize(RegionServerServices rss) throws IOException {
     this.rss = rss;
     ZooKeeperWatcher zkw = rss.getZooKeeper();
     this.memberRpcs = new ZKProcedureMemberRpcs(zkw,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4ab2693..0ce8ee4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -828,8 +828,8 @@ public class HRegionServer extends HasThread implements
       rspmHost = new RegionServerProcedureManagerHost();
       rspmHost.loadProcedures(conf);
       rspmHost.initialize(this);
-    } catch (KeeperException e) {
-      this.abort("Failed to reach zk cluster when creating procedure handler.", e);
+    } catch (IOException e) {
+      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
     }
     // register watcher for recovering regions
     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 537329a..e56dd28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -390,7 +390,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
    * @throws KeeperException if the zookeeper cluster cannot be reached
    */
   @Override
-  public void initialize(RegionServerServices rss) throws KeeperException {
+  public void initialize(RegionServerServices rss) throws IOException {
     this.rss = rss;
     ZooKeeperWatcher zkw = rss.getZooKeeper();
     this.memberRpcs = new ZKProcedureMemberRpcs(zkw,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index f3f869c..31f05c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -96,6 +96,8 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 
+
+
 /**
  * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
  * Only one WAL is ever being written at a time.  When a WAL hits a configured maximum size,
@@ -356,7 +358,9 @@ public class FSHLog implements WAL {
     public int compare(Path o1, Path o2) {
       long t1 = getFileNumFromFileName(o1);
       long t2 = getFileNumFromFileName(o2);
-      if (t1 == t2) return 0;
+      if (t1 == t2) {
+        return 0;
+      }
       return (t1 > t2) ? 1 : -1;
     }
   };
@@ -399,7 +403,7 @@ public class FSHLog implements WAL {
    * @param root path for stored and archived wals
    * @param logDir dir where wals are stored
    * @param conf configuration to use
-   * @throws IOException
+   * @throws IOException exception
    */
   public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
       throws IOException {
@@ -407,7 +411,7 @@ public class FSHLog implements WAL {
   }
 
   /**
-   * Create an edit log at the given <code>dir</code> location.
+   * Create an edit log at the given directory location.
    *
    * You should never have to load an existing log. If there is a log at
    * startup, it should have already been processed and deleted by the time the
@@ -422,13 +426,13 @@ public class FSHLog implements WAL {
    * be registered before we do anything else; e.g. the
    * Constructor {@link #rollWriter()}.
    * @param failIfWALExists If true IOException will be thrown if files related to this wal
-   *        already exist.
+   *     already exist.
    * @param prefix should always be hostname and port in distributed env and
-   *        it will be URL encoded before being used.
-   *        If prefix is null, "wal" will be used
+   *     it will be URL encoded before being used.
+   *     If prefix is null, "wal" will be used
    * @param suffix will be url encoded. null is treated as empty. non-empty must start with
-   *        {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
-   * @throws IOException
+   *     {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
+   * @throws IOException exception
    */
   public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
       final String archiveDir, final Configuration conf,
@@ -590,7 +594,9 @@ public class FSHLog implements WAL {
   @VisibleForTesting
   OutputStream getOutputStream() {
     FSDataOutputStream fsdos = this.hdfs_out;
-    if (fsdos == null) return null;
+    if (fsdos == null) {
+      return null;
+    }
     return fsdos.getWrappedStream();
   }
 
@@ -625,7 +631,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about pre log roll.
-   * @throws IOException
+   * @throws IOException exception
    */
   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -638,7 +644,7 @@ public class FSHLog implements WAL {
 
   /**
    * Tell listeners about post log roll.
-   * @throws IOException
+   * @throws IOException exception
    */
   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
   throws IOException {
@@ -651,8 +657,7 @@ public class FSHLog implements WAL {
 
   /**
    * Run a sync after opening to set up the pipeline.
-   * @param nextWriter
-   * @param startTimeNanos
+   * @param nextWriter next writer
    */
   private void preemptiveSync(final ProtobufLogWriter nextWriter) {
     long startTimeNanos = System.nanoTime();
@@ -670,7 +675,9 @@ public class FSHLog implements WAL {
     rollWriterLock.lock();
     try {
       // Return if nothing to flush.
-      if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
+      if (!force && (this.writer != null && this.numEntries.get() <= 0)) {
+        return null;
+      }
       byte [][] regionsToFlush = null;
       if (this.closed) {
         LOG.debug("WAL closed. Skipping rolling of writer");
@@ -725,7 +732,7 @@ public class FSHLog implements WAL {
 
   /**
    * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
-   * @throws IOException
+   * @throws IOException exception
    */
   private void cleanOldLogs() throws IOException {
     List<Path> logsToArchive = null;
@@ -735,9 +742,13 @@ public class FSHLog implements WAL {
       Path log = e.getKey();
       Map<byte[], Long> sequenceNums = e.getValue();
       if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
-        if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
+        if (logsToArchive == null) {
+          logsToArchive = new ArrayList<Path>();
+        }
         logsToArchive.add(log);
-        if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("WAL file ready for archiving " + log);
+        }
       }
     }
     if (logsToArchive != null) {
@@ -767,7 +778,9 @@ public class FSHLog implements WAL {
     if (regions != null) {
       StringBuilder sb = new StringBuilder();
       for (int i = 0; i < regions.length; i++) {
-        if (i > 0) sb.append(", ");
+        if (i > 0) {
+          sb.append(", ");
+        }
         sb.append(Bytes.toStringBinary(regions[i]));
       }
       LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
@@ -833,7 +846,9 @@ public class FSHLog implements WAL {
         }
       } catch (FailedSyncBeforeLogCloseException e) {
         // If unflushed/unsynced entries on close, it is reason to abort.
-        if (isUnflushedEntries()) throw e;
+        if (isUnflushedEntries()) {
+          throw e;
+        }
         LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
           e.getMessage());
       }
@@ -894,7 +909,9 @@ public class FSHLog implements WAL {
             try {
               blockOnSync(syncFuture);
             } catch (IOException ioe) {
-              if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Stale sync exception", ioe);
+              }
             }
           }
         }
@@ -965,7 +982,15 @@ public class FSHLog implements WAL {
   public Path getCurrentFileName() {
     return computeFilename(this.filenum.get());
   }
-
+  
+  /**
+   * To support old API compatibility
+   * @return current file number (timestamp)
+   */
+  public long getFilenum() {
+    return filenum.get();
+  }
+  
   @Override
   public String toString() {
     return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
new file mode 100644
index 0000000..26f261c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LogUtils.java
@@ -0,0 +1,43 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public final class LogUtils {
+
+  private LogUtils() {    
+  }
+  /**
+   * Disables Zk- and HBase client logging
+   * @param log
+   */
+  public static void disableUselessLoggers(Log log) {
+    // disable zookeeper log to avoid it mess up command output
+    Logger zkLogger = Logger.getLogger("org.apache.zookeeper");
+    zkLogger.setLevel(Level.OFF);
+    // disable hbase zookeeper tool log to avoid it mess up command output
+    Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper");
+    hbaseZkLogger.setLevel(Level.OFF);
+    // disable hbase client log to avoid it mess up command output
+    Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client");
+    hbaseClientLogger.setLevel(Level.OFF);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 027e7a2..dd4d337 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -209,13 +209,18 @@ public class DefaultWALProvider implements WALProvider {
   @VisibleForTesting
   public static long extractFileNumFromWAL(final WAL wal) {
     final Path walName = ((FSHLog)wal).getCurrentFileName();
+    return extractFileNumFromWAL(walName);
+  }
+
+  @VisibleForTesting
+  public static long extractFileNumFromWAL(final Path walName) {
     if (walName == null) {
       throw new IllegalArgumentException("The WAL path couldn't be null");
     }
     final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER);
     return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2:1)]);
   }
-
+  
   /**
    * Pattern used to validate a WAL file name
    * see {@link #validateWALFilename(String)} for description.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab491d4a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
new file mode 100644
index 0000000..84b7c78
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupContext;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * This class is only a base for other integration-level backup tests. Do not add tests here.
+ * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
+ * tests should have their own classes and extend this one
+ */
+public class TestBackupBase {
+
+  private static final Log LOG = LogFactory.getLog(TestBackupBase.class);
+
+  protected static Configuration conf1;
+  protected static Configuration conf2;
+
+  protected static HBaseTestingUtility TEST_UTIL;
+  protected static HBaseTestingUtility TEST_UTIL2;
+  protected static TableName table1;
+  protected static TableName table2;
+  protected static TableName table3;
+  protected static TableName table4;
+
+  protected static TableName table1_restore = TableName.valueOf("table1_restore");
+  protected static TableName table2_restore = TableName.valueOf("table2_restore");
+  protected static TableName table3_restore = TableName.valueOf("table3_restore");
+  protected static TableName table4_restore = TableName.valueOf("table4_restore");
+
+  protected static final int NB_ROWS_IN_BATCH = 100;
+  protected static final byte[] qualName = Bytes.toBytes("q1");
+  protected static final byte[] famName = Bytes.toBytes("f");
+
+  protected static String BACKUP_ROOT_DIR = "/backupUT";
+  protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
+
+  protected static final String BACKUP_ZNODE = "/backup/hbase";
+  protected static final String BACKUP_SUCCEED_NODE = "complete";
+  protected static final String BACKUP_FAILED_NODE = "failed";
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    TEST_UTIL.getConfiguration().set("hbase.procedure.regionserver.classes",
+      LogRollRegionServerProcedureManager.class.getName());
+    TEST_UTIL.getConfiguration().set("hbase.procedure.master.classes",
+      LogRollMasterProcedureManager.class.getName());
+    TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    TEST_UTIL.startMiniZKCluster();
+    MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
+
+    conf1 = TEST_UTIL.getConfiguration();
+    conf2 = HBaseConfiguration.create(conf1);
+    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+    TEST_UTIL2 = new HBaseTestingUtility(conf2);
+    TEST_UTIL2.setZkCluster(miniZK);
+    TEST_UTIL.startMiniCluster();
+    TEST_UTIL2.startMiniCluster();
+    conf1 = TEST_UTIL.getConfiguration();
+
+    TEST_UTIL.startMiniMapReduceCluster();
+    BACKUP_ROOT_DIR = TEST_UTIL.getConfiguration().get("fs.defaultFS") + "/backupUT";
+    LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
+    BACKUP_REMOTE_ROOT_DIR = TEST_UTIL2.getConfiguration().get("fs.defaultFS") + "/backupUT";
+    LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
+
+    createTables();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
+    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+    TEST_UTIL2.shutdownMiniCluster();
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+  }
+
+  protected static void loadTable(HTable table) throws Exception {
+
+    Put p; // 100 + 1 row to t1_syncup
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      table.put(p);
+    }
+  }
+
+  protected static void createTables() throws Exception {
+
+    long tid = System.currentTimeMillis();
+    table1 = TableName.valueOf("test-" + tid);
+    HBaseAdmin ha = TEST_UTIL.getHBaseAdmin();
+    HTableDescriptor desc = new HTableDescriptor(table1);
+    HColumnDescriptor fam = new HColumnDescriptor(famName);
+    desc.addFamily(fam);
+    ha.createTable(desc);
+    Connection conn = ConnectionFactory.createConnection(conf1);
+    HTable table = (HTable) conn.getTable(table1);
+    loadTable(table);
+    table.close();
+    table2 = TableName.valueOf("test-" + tid + 1);
+    desc = new HTableDescriptor(table2);
+    desc.addFamily(fam);
+    ha.createTable(desc);
+    table = (HTable) conn.getTable(table2);
+    loadTable(table);
+    table.close();
+    table3 = TableName.valueOf("test-" + tid + 2);
+    table = TEST_UTIL.createTable(table3, famName);
+    table.close();
+    table4 = TableName.valueOf("test-" + tid + 3);
+    table = TEST_UTIL.createTable(table4, famName);
+    table.close();
+    ha.close();
+    conn.close();
+  }
+
+  protected boolean checkSucceeded(String backupId) throws IOException {
+    BackupContext status = getBackupContext(backupId);
+    if (status == null) return false;
+    return status.getState() == BackupState.COMPLETE;
+  }
+
+  protected boolean checkFailed(String backupId) throws IOException {
+    BackupContext status = getBackupContext(backupId);
+    if (status == null) return false;
+    return status.getState() == BackupState.FAILED;
+  }
+
+  private BackupContext getBackupContext(String backupId) throws IOException {
+    Configuration conf = conf1;//BackupClientImpl.getConf();
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+        BackupSystemTable table = new BackupSystemTable(connection)) {
+      BackupContext status = table.readBackupStatus(backupId);
+      return status;
+    }
+  }
+
+  protected BackupClient getBackupClient(){
+    return BackupRestoreFactory.getBackupClient(conf1);
+  }
+
+  protected RestoreClient getRestoreClient()
+  {
+    return BackupRestoreFactory.getRestoreClient(conf1);
+  }
+
+  /**
+   * Helper method
+   */
+  protected List<TableName> toList(String... args){
+    List<TableName> ret = new ArrayList<>();
+    for(int i=0; i < args.length; i++){
+      ret.add(TableName.valueOf(args[i]));
+    }
+    return ret;
+  }
+}


Mime
View raw message