hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject [2/6] hbase git commit: HBASE-15411 Rewrite backup with Procedure V2 - phase 1
Date Fri, 01 Apr 2016 22:02:24 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupProcedure.java
new file mode 100644
index 0000000..175f2bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupProcedure.java
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupClientUtil;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupContext.BackupPhase;
+import org.apache.hadoop.hbase.backup.impl.BackupContext.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.FullTableBackupState;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+@InterfaceAudience.Private
+public class FullTableBackupProcedure
+    extends StateMachineProcedure<MasterProcedureEnv, FullTableBackupState>
+    implements TableProcedureInterface {
+  private static final Log LOG = LogFactory.getLog(FullTableBackupProcedure.class);
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private Configuration conf;
+  private String backupId;
+  private List<TableName> tableList;
+  private String targetRootDir;
+  HashMap<String, Long> newTimestamps = null;
+
+  private BackupManager backupManager;
+  private BackupContext backupContext;
+
+  public FullTableBackupProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public FullTableBackupProcedure(final MasterProcedureEnv env,
+      final String backupId, List<TableName> tableList, String targetRootDir, final int workers,
+      final long bandwidth) throws IOException {
+    backupManager = new BackupManager(env.getMasterConfiguration());
+    this.backupId = backupId;
+    this.tableList = tableList;
+    this.targetRootDir = targetRootDir;
+    backupContext =
+        backupManager.createBackupContext(backupId, BackupType.FULL, tableList, targetRootDir);
+    if (tableList == null || tableList.isEmpty()) {
+      this.tableList = new ArrayList<>(backupContext.getTables());
+    }
+  }
+
+  @Override
+  public byte[] getResult() {
+    return backupId.getBytes();
+  }
+
+  /**
+   * Begin the overall backup.
+   * @param backupContext backup context
+   * @throws IOException exception
+   */
+  static void beginBackup(BackupManager backupManager, BackupContext backupContext)
+      throws IOException {
+    backupManager.setBackupContext(backupContext);
+    // set the start timestamp of the overall backup
+    long startTs = EnvironmentEdgeManager.currentTime();
+    backupContext.setStartTs(startTs);
+    // set overall backup status: ongoing
+    backupContext.setState(BackupState.RUNNING);
+    LOG.info("Backup " + backupContext.getBackupId() + " started at " + startTs + ".");
+
+    backupManager.updateBackupStatus(backupContext);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Backup session " + backupContext.getBackupId() + " has been started.");
+    }
+  }
+  
+  private static String getMessage(Exception e) {
+    String msg = e.getMessage();
+    if (msg == null || msg.equals("")) {
+      msg = e.getClass().getName();
+    }
+    return msg;
+  }
+
+  /**
+   * Delete HBase snapshot for backup.
+   * @param backupCtx backup context
+   * @throws Exception exception
+   */
+  private static void deleteSnapshot(final MasterProcedureEnv env,
+      BackupContext backupCtx, Configuration conf)
+      throws IOException {
+    LOG.debug("Trying to delete snapshot for full backup.");
+    for (String snapshotName : backupCtx.getSnapshotNames()) {
+      if (snapshotName == null) {
+        continue;
+      }
+      LOG.debug("Trying to delete snapshot: " + snapshotName);
+      HBaseProtos.SnapshotDescription.Builder builder =
+          HBaseProtos.SnapshotDescription.newBuilder();
+      builder.setName(snapshotName);
+      try {
+        env.getMasterServices().getSnapshotManager().deleteSnapshot(builder.build());
+      } catch (IOException ioe) {
+        LOG.debug("when deleting snapshot " + snapshotName, ioe);
+      }
+      LOG.debug("Deleting the snapshot " + snapshotName + " for backup "
+          + backupCtx.getBackupId() + " succeeded.");
+    }
+  }
+
+  /**
+   * Clean up directories with prefix "exportSnapshot-", which are generated when exporting
+   * snapshots.
+   * @throws IOException exception
+   */
+  private static void cleanupExportSnapshotLog(Configuration conf) throws IOException {
+    FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+    Path stagingDir =
+        new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory()
+          .toString()));
+    FileStatus[] files = FSUtils.listStatus(fs, stagingDir);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      if (file.getPath().getName().startsWith("exportSnapshot-")) {
+        LOG.debug("Delete log files of exporting snapshot: " + file.getPath().getName());
+        if (FSUtils.delete(fs, file.getPath(), true) == false) {
+          LOG.warn("Can not delete " + file.getPath());
+        }
+      }
+    }
+  }
+
+  /**
+   * Clean up the uncompleted data at target directory if the ongoing backup has already entered the
+   * copy phase.
+   */
+  static void cleanupTargetDir(BackupContext backupContext, Configuration conf) {
+    try {
+      // clean up the uncompleted data at target directory if the ongoing backup has already entered
+      // the copy phase
+      LOG.debug("Trying to cleanup up target dir. Current backup phase: "
+          + backupContext.getPhase());
+      if (backupContext.getPhase().equals(BackupPhase.SNAPSHOTCOPY)
+          || backupContext.getPhase().equals(BackupPhase.INCREMENTAL_COPY)
+          || backupContext.getPhase().equals(BackupPhase.STORE_MANIFEST)) {
+        FileSystem outputFs =
+            FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), conf);
+
+        // now treat one backup as a transaction, clean up data that has been partially copied at
+        // table level
+        for (TableName table : backupContext.getTables()) {
+          Path targetDirPath =
+              new Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(),
+                backupContext.getBackupId(), table));
+          if (outputFs.delete(targetDirPath, true)) {
+            LOG.info("Cleaning up uncompleted backup data at " + targetDirPath.toString()
+              + " done.");
+          } else {
+            LOG.info("No data has been copied to " + targetDirPath.toString() + ".");
+          }
+
+          Path tableDir = targetDirPath.getParent();
+          FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
+          if (backups == null || backups.length == 0) {
+            outputFs.delete(tableDir, true);
+            LOG.debug(tableDir.toString() + " is empty, remove it.");
+          }
+        }
+      }
+
+    } catch (IOException e1) {
+      LOG.error("Cleaning up uncompleted backup data of " + backupContext.getBackupId() + " at "
+          + backupContext.getTargetRootDir() + " failed due to " + e1.getMessage() + ".");
+    }
+  }
+
+  /**
+   * Fail the overall backup.
+   * @param backupContext backup context
+   * @param e exception
+   * @throws Exception exception
+   */
+  static void failBackup(final MasterProcedureEnv env, BackupContext backupContext,
+      BackupManager backupManager, Exception e,
+      String msg, BackupType type, Configuration conf) throws IOException {
+    LOG.error(msg + getMessage(e));
+    // If this is a cancel exception, then we've already cleaned.
+
+    if (backupContext.getState().equals(BackupState.CANCELLED)) {
+      return;
+    }
+
+    // set the failure timestamp of the overall backup
+    backupContext.setEndTs(EnvironmentEdgeManager.currentTime());
+
+    // set failure message
+    backupContext.setFailedMsg(e.getMessage());
+
+    // set overall backup status: failed
+    backupContext.setState(BackupState.FAILED);
+
+    // compose the backup failed data
+    String backupFailedData =
+        "BackupId=" + backupContext.getBackupId() + ",startts=" + backupContext.getStartTs()
+        + ",failedts=" + backupContext.getEndTs() + ",failedphase=" + backupContext.getPhase()
+        + ",failedmessage=" + backupContext.getFailedMsg();
+    LOG.error(backupFailedData);
+
+    backupManager.updateBackupStatus(backupContext);
+
+    // if full backup, then delete HBase snapshots if there already are snapshots taken
+    // and also clean up export snapshot log files if exist
+    if (type == BackupType.FULL) {
+      deleteSnapshot(env, backupContext, conf);
+      cleanupExportSnapshotLog(conf);
+    }
+
+    // clean up the uncompleted data at target directory if the ongoing backup has already entered
+    // the copy phase
+    // For incremental backup, DistCp logs will be cleaned with the targetDir.
+    cleanupTargetDir(backupContext, conf);
+
+    LOG.info("Backup " + backupContext.getBackupId() + " failed.");
+  }
+
+  /**
+   * Do snapshot copy.
+   * @param backupContext backup context
+   * @throws Exception exception
+   */
+  private void snapshotCopy(BackupContext backupContext) throws Exception {
+    LOG.info("Snapshot copy is starting.");
+
+    // set overall backup phase: snapshot_copy
+    backupContext.setPhase(BackupPhase.SNAPSHOTCOPY);
+
+    // avoid action if has been cancelled
+    if (backupContext.isCancelled()) {
+      return;
+    }
+
+    // call ExportSnapshot to copy files based on hbase snapshot for backup
+    // ExportSnapshot only support single snapshot export, need loop for multiple tables case
+    BackupCopyService copyService = BackupRestoreFactory.getBackupCopyService(conf);
+
+    // number of snapshots matches number of tables
+    float numOfSnapshots = backupContext.getSnapshotNames().size();
+
+    LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied.");
+
+    for (TableName table : backupContext.getTables()) {
+      // Currently we simply set the sub copy tasks by counting the table snapshot number, we can
+      // calculate the real files' size for the percentage in the future.
+      // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
+      int res = 0;
+      String[] args = new String[4];
+      args[0] = "-snapshot";
+      args[1] = backupContext.getSnapshotName(table);
+      args[2] = "-copy-to";
+      args[3] = backupContext.getBackupStatus(table).getTargetDir();
+
+      LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
+      res = copyService.copy(backupContext, backupManager, conf, BackupCopyService.Type.FULL, args);
+      // if one snapshot export failed, do not continue for remained snapshots
+      if (res != 0) {
+        LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + ".");
+
+        throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3]
+            + " with reason code " + res);
+      }
+
+      LOG.info("Snapshot copy " + args[1] + " finished.");
+    }
+  }
+  
+  /**
+   * Add manifest for the current backup. The manifest is stored
+   * within the table backup directory.
+   * @param backupContext The current backup context
+   * @throws IOException exception
+   * @throws BackupException exception
+   */
+  private static void addManifest(BackupContext backupContext, BackupManager backupManager,
+      BackupType type, Configuration conf) throws IOException, BackupException {
+    // set the overall backup phase : store manifest
+    backupContext.setPhase(BackupPhase.STORE_MANIFEST);
+
+    // avoid action if has been cancelled
+    if (backupContext.isCancelled()) {
+      return;
+    }
+
+    BackupManifest manifest;
+
+    // Since we have each table's backup in its own directory structure,
+    // we'll store its manifest with the table directory.
+    for (TableName table : backupContext.getTables()) {
+      manifest = new BackupManifest(backupContext, table);
+      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupContext, table);
+      for (BackupImage image : ancestors) {
+        manifest.addDependentImage(image);
+      }
+
+      if (type == BackupType.INCREMENTAL) {
+        // We'll store the log timestamps for this table only in its manifest.
+        HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+            new HashMap<TableName, HashMap<String, Long>>();
+        tableTimestampMap.put(table, backupContext.getIncrTimestampMap().get(table));
+        manifest.setIncrTimestampMap(tableTimestampMap);
+        ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupContext);
+        for (BackupImage image : ancestorss) {
+          manifest.addDependentImage(image);
+        }
+      }
+      // TODO
+      // manifest.setRelativeWALReferences(backupContext.getRelWALRefs());
+      manifest.store(conf);
+    }
+
+    // For incremental backup, we store a overall manifest in
+    // <backup-root-dir>/WALs/<backup-id>
+    // This is used when created the next incremental backup
+    if (type == BackupType.INCREMENTAL) {
+      manifest = new BackupManifest(backupContext);
+      // set the table region server start and end timestamps for incremental backup
+      manifest.setIncrTimestampMap(backupContext.getIncrTimestampMap());
+      ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupContext);
+      for (BackupImage image : ancestors) {
+        manifest.addDependentImage(image);
+      }
+      // TODO
+      // manifest.setRelativeWALReferences(backupContext.getRelWALRefs());
+      manifest.store(conf);
+    }
+  }
+
+  /**
+   * Get backup request meta data dir as string.
+   * @param backupContext backup context
+   * @return meta data dir
+   */
+  private static String obtainBackupMetaDataStr(BackupContext backupContext) {
+    StringBuffer sb = new StringBuffer();
+    sb.append("type=" + backupContext.getType() + ",tablelist=");
+    for (TableName table : backupContext.getTables()) {
+      sb.append(table + ";");
+    }
+    if (sb.lastIndexOf(";") > 0) {
+      sb.delete(sb.lastIndexOf(";"), sb.lastIndexOf(";") + 1);
+    }
+    sb.append(",targetRootDir=" + backupContext.getTargetRootDir());
+
+    return sb.toString();
+  }
+
+  /**
+   * Clean up directories with prefix "_distcp_logs-", which are generated when DistCp copying
+   * hlogs.
+   * @throws IOException exception
+   */
+  private static void cleanupDistCpLog(BackupContext backupContext, Configuration conf)
+      throws IOException {
+    Path rootPath = new Path(backupContext.getHLogTargetDir()).getParent();
+    FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+    FileStatus[] files = FSUtils.listStatus(fs, rootPath);
+    if (files == null) {
+      return;
+    }
+    for (FileStatus file : files) {
+      if (file.getPath().getName().startsWith("_distcp_logs")) {
+        LOG.debug("Delete log files of DistCp: " + file.getPath().getName());
+        FSUtils.delete(fs, file.getPath(), true);
+      }
+    }
+  }
+
+  /**
+   * Complete the overall backup.
+   * @param backupContext backup context
+   * @throws Exception exception
+   */
+  static void completeBackup(final MasterProcedureEnv env, BackupContext backupContext,
+      BackupManager backupManager, BackupType type, Configuration conf) throws IOException {
+    // set the complete timestamp of the overall backup
+    backupContext.setEndTs(EnvironmentEdgeManager.currentTime());
+    // set overall backup status: complete
+    backupContext.setState(BackupState.COMPLETE);
+    // add and store the manifest for the backup
+    addManifest(backupContext, backupManager, type, conf);
+
+    // after major steps done and manifest persisted, do convert if needed for incremental backup
+    /* in-fly convert code here, provided by future jira */
+    LOG.debug("in-fly convert code here, provided by future jira");
+
+    // compose the backup complete data
+    String backupCompleteData =
+        obtainBackupMetaDataStr(backupContext) + ",startts=" + backupContext.getStartTs()
+        + ",completets=" + backupContext.getEndTs() + ",bytescopied="
+        + backupContext.getTotalBytesCopied();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Backup " + backupContext.getBackupId() + " finished: " + backupCompleteData);
+    }
+    backupManager.updateBackupStatus(backupContext);
+
+    // when full backup is done:
+    // - delete HBase snapshot
+    // - clean up directories with prefix "exportSnapshot-", which are generated when exporting
+    // snapshots
+    if (type == BackupType.FULL) {
+      deleteSnapshot(env, backupContext, conf);
+      cleanupExportSnapshotLog(conf);
+    } else if (type == BackupType.INCREMENTAL) {
+      cleanupDistCpLog(backupContext, conf);
+    }
+
+    LOG.info("Backup " + backupContext.getBackupId() + " completed.");
+  }
+
+  /**
+   * Wrap a SnapshotDescription for a target table.
+   * @param table table
+   * @return a SnapshotDescription especially for backup.
+   */
+  static SnapshotDescription wrapSnapshotDescription(TableName tableName, String snapshotName) {
+    // Mock a SnapshotDescription from backupContext to call SnapshotManager function,
+    // Name it in the format "snapshot_<timestamp>_<table>"
+    HBaseProtos.SnapshotDescription.Builder builder = HBaseProtos.SnapshotDescription.newBuilder();
+    builder.setTable(tableName.getNameAsString());
+    builder.setName(snapshotName);
+    HBaseProtos.SnapshotDescription backupSnapshot = builder.build();
+
+    LOG.debug("Wrapped a SnapshotDescription " + backupSnapshot.getName()
+      + " from backupContext to request snapshot for backup.");
+
+    return backupSnapshot;
+  }
+
+  @Override
+  protected Flow executeFromState(final MasterProcedureEnv env, final FullTableBackupState state)
+      throws InterruptedException {
+    if (conf == null) {
+      conf = env.getMasterConfiguration();
+    }
+    if (backupManager == null) {
+      try {
+        backupManager = new BackupManager(env.getMasterConfiguration());
+      } catch (IOException ioe) {
+        setFailure("full backup", ioe);
+        return Flow.NO_MORE_STATE;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+    try {
+      switch (state) {
+        case PRE_SNAPSHOT_TABLE:
+          beginBackup(backupManager, backupContext);
+          String savedStartCode = null;
+          boolean firstBackup = false;
+          // do snapshot for full table backup
+
+          try {
+            savedStartCode = backupManager.readBackupStartCode();
+            firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
+            if (firstBackup) {
+              // This is our first backup. Let's put some marker on ZK so that we can hold the logs
+              // while we do the backup.
+              backupManager.writeBackupStartCode(0L);
+            }
+            // We roll log here before we do the snapshot. It is possible there is duplicate data
+            // in the log that is already in the snapshot. But if we do it after the snapshot, we
+            // could have data loss.
+            // A better approach is to do the roll log on each RS in the same global procedure as
+            // the snapshot.
+            LOG.info("Execute roll log procedure for full backup ...");
+            MasterProcedureManager mpm = env.getMasterServices().getMasterProcedureManagerHost()
+                .getProcedureManager(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+            Map<String, String> props= new HashMap<String, String>();
+            long waitTime = MasterProcedureUtil.execProcedure(mpm,
+              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+            MasterProcedureUtil.waitForProcedure(mpm,
+              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+              LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props, waitTime,
+              conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER),
+              conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+                HConstants.DEFAULT_HBASE_CLIENT_PAUSE));
+
+            newTimestamps = backupManager.readRegionServerLastLogRollResult();
+            if (firstBackup) {
+              // Updates registered log files
+              // We record ALL old WAL files as registered, because
+              // this is a first full backup in the system and these
+              // files are not needed for next incremental backup
+              List<String> logFiles = BackupUtil.getWALFilesOlderThan(conf, newTimestamps);
+              backupManager.recordWALFiles(logFiles);
+            }
+          } catch (BackupException e) {
+            // fail the overall backup and return
+            failBackup(env, backupContext, backupManager, e, "Unexpected BackupException : ",
+              BackupType.FULL, conf);
+            return Flow.NO_MORE_STATE;
+          }
+          setNextState(FullTableBackupState.SNAPSHOT_TABLES);
+          break;
+        case SNAPSHOT_TABLES:
+          for (TableName tableName : tableList) {
+            String snapshotName = "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime())
+                + "_" + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
+            HBaseProtos.SnapshotDescription backupSnapshot;
+
+            // wrap a SnapshotDescription for offline/online snapshot
+            backupSnapshot = wrapSnapshotDescription(tableName,snapshotName);
+            try {
+              env.getMasterServices().getSnapshotManager().deleteSnapshot(backupSnapshot);
+            } catch (IOException e) {
+              LOG.debug("Unable to delete " + snapshotName, e);
+            }
+            // Kick off snapshot for backup
+            try {
+              env.getMasterServices().getSnapshotManager().takeSnapshot(backupSnapshot);
+            } catch (IOException e) {
+              LOG.debug("Unable to take snapshot: " + snapshotName, e);
+            }
+            long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(
+                env.getMasterConfiguration(),
+                backupSnapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
+            BackupUtil.waitForSnapshot(backupSnapshot, waitTime,
+              env.getMasterServices().getSnapshotManager(), env.getMasterConfiguration());
+            // set the snapshot name in BackupStatus of this table, only after snapshot success.
+            backupContext.setSnapshotName(tableName, backupSnapshot.getName());
+          }
+          setNextState(FullTableBackupState.SNAPSHOT_COPY);
+          break;
+        case SNAPSHOT_COPY:
+          // do snapshot copy
+          LOG.debug("snapshot copy for " + backupId);
+          try {
+            this.snapshotCopy(backupContext);
+          } catch (Exception e) {
+            // fail the overall backup and return
+            failBackup(env, backupContext, backupManager, e, "Unexpected BackupException : ",
+              BackupType.FULL, conf);
+            return Flow.NO_MORE_STATE;
+          }
+          // Updates incremental backup table set
+          backupManager.addIncrementalBackupTableSet(backupContext.getTables());
+          setNextState(FullTableBackupState.BACKUP_COMPLETE);
+          break;
+
+        case BACKUP_COMPLETE:
+          // set overall backup status: complete. Here we make sure to complete the backup.
+          // After this checkpoint, even if entering cancel process, will let the backup finished
+          backupContext.setState(BackupState.COMPLETE);
+          // The table list in backupContext is good for both full backup and incremental backup.
+          // For incremental backup, it contains the incremental backup table set.
+          backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps);
+
+          HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+              backupManager.readLogTimestampMap();
+
+          Long newStartCode =
+            BackupClientUtil.getMinValue(BackupUtil.getRSLogTimestampMins(newTableSetTimestampMap));
+          backupManager.writeBackupStartCode(newStartCode);
+
+          // backup complete
+          completeBackup(env, backupContext, backupManager, BackupType.FULL, conf);
+          return Flow.NO_MORE_STATE;
+
+        default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    } catch (IOException e) {
+      LOG.error("Backup failed in " + state);
+      setFailure("snapshot-table", e);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(final MasterProcedureEnv env, final FullTableBackupState state)
+      throws IOException {
+    if (state != FullTableBackupState.PRE_SNAPSHOT_TABLE) {
+      deleteSnapshot(env, backupContext, conf);
+      cleanupExportSnapshotLog(conf);
+    }
+
+    // clean up the uncompleted data at target directory if the ongoing backup has already entered
+    // the copy phase
+    // For incremental backup, DistCp logs will be cleaned with the targetDir.
+    if (state == FullTableBackupState.SNAPSHOT_COPY) {
+      cleanupTargetDir(backupContext, conf);
+    }
+  }
+
+  @Override
+  protected FullTableBackupState getState(final int stateId) {
+    return FullTableBackupState.valueOf(stateId);
+  }
+
+  @Override
+  protected int getStateId(final FullTableBackupState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected FullTableBackupState getInitialState() {
+    return FullTableBackupState.PRE_SNAPSHOT_TABLE;
+  }
+
+  @Override
+  protected void setNextState(final FullTableBackupState state) {
+    if (aborted.get()) {
+      setAbortFailure("backup-table", "abort requested");
+    } else {
+      super.setNextState(state);
+    }
+  }
+
+  @Override
+  public boolean abort(final MasterProcedureEnv env) {
+    aborted.set(true);
+    return true;
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" (targetRootDir=");
+    sb.append(targetRootDir);
+    sb.append(")");
+  }
+
+  BackupProtos.BackupProcContext toBackupContext() {
+    BackupProtos.BackupProcContext.Builder ctxBuilder = BackupProtos.BackupProcContext.newBuilder();
+    ctxBuilder.setCtx(backupContext.toBackupContext());
+    if (newTimestamps != null && !newTimestamps.isEmpty()) {
+      BackupProtos.ServerTimestamp.Builder tsBuilder = ServerTimestamp.newBuilder();
+      for (Entry<String, Long> entry : newTimestamps.entrySet()) {
+        tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
+        ctxBuilder.addServerTimestamp(tsBuilder.build());
+      }
+    }
+    return ctxBuilder.build();
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+
+    BackupProtos.BackupProcContext backupProcCtx = toBackupContext();
+    backupProcCtx.writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+
+    BackupProtos.BackupProcContext proto =BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
+    backupContext = BackupContext.fromProto(proto.getCtx());
+    backupId = backupContext.getBackupId();
+    targetRootDir = backupContext.getTargetRootDir();
+    tableList = backupContext.getTableNames();
+    List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
+    if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
+      newTimestamps = new HashMap<>();
+      for (ServerTimestamp ts : svrTimestamps) {
+        newTimestamps.put(ts.getServer(), ts.getTimestamp());
+      }
+    }
+  }
+
+  @Override
+  public TableName getTableName() {
+    return TableName.BACKUP_TABLE_NAME; 
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.BACKUP;
+  }
+
+  @Override
+  protected boolean acquireLock(final MasterProcedureEnv env) {
+    if (env.waitInitialized(this)) {
+      return false;
+    }
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupProcedure.java
new file mode 100644
index 0000000..8c12582
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupProcedure.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupClientUtil;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupContext.BackupPhase;
+import org.apache.hadoop.hbase.backup.impl.BackupContext.BackupState;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.IncrementalTableBackupState;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.ServerTimestamp;
+
+@InterfaceAudience.Private
+public class IncrementalTableBackupProcedure
+    extends StateMachineProcedure<MasterProcedureEnv, IncrementalTableBackupState> 
+    implements TableProcedureInterface {
+  private static final Log LOG = LogFactory.getLog(IncrementalTableBackupProcedure.class);
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private Configuration conf;
+  private String backupId;
+  private List<TableName> tableList;
+  private String targetRootDir;
+  HashMap<String, Long> newTimestamps = null;
+
+  private BackupManager backupManager;
+  private BackupContext backupContext;
+
+  public IncrementalTableBackupProcedure() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public IncrementalTableBackupProcedure(final MasterProcedureEnv env,
+      final String backupId,
+      List<TableName> tableList, String targetRootDir, final int workers,
+      final long bandwidth) throws IOException {
+    backupManager = new BackupManager(env.getMasterConfiguration());
+    this.backupId = backupId;
+    this.tableList = tableList;
+    this.targetRootDir = targetRootDir;
+    backupContext = backupManager.createBackupContext(backupId, BackupType.INCREMENTAL, tableList,
+          targetRootDir);
+  }
+
+  @Override
+  public byte[] getResult() {
+    return backupId.getBytes();
+  }
+
+  private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    List<String> list = new ArrayList<String>();
+    for(String file : incrBackupFileList){
+      if(fs.exists(new Path(file))){
+        list.add(file);
+      } else{
+        LOG.warn("Can't find file: "+file);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Do incremental copy.
+   * @param backupContext backup context
+   */
+  private void incrementalCopy(BackupContext backupContext) throws Exception {
+
+    LOG.info("Incremental copy is starting.");
+
+    // set overall backup phase: incremental_copy
+    backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
+
+    // avoid action if has been cancelled
+    if (backupContext.isCancelled()) {
+      return;
+    }
+
+    // get incremental backup file list and prepare parms for DistCp
+    List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
+    // filter missing files out (they have been copied by previous backups)
+    incrBackupFileList = filterMissingFiles(incrBackupFileList);
+    String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+    strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
+
+    BackupCopyService copyService = BackupRestoreFactory.getBackupCopyService(conf);
+    int res = copyService.copy(backupContext, backupManager, conf,
+      BackupCopyService.Type.INCREMENTAL, strArr);
+
+    if (res != 0) {
+      LOG.error("Copy incremental log files failed with return code: " + res + ".");
+      throw new IOException("Failed of Hadoop Distributed Copy from " + incrBackupFileList + " to "
+          + backupContext.getHLogTargetDir());
+    }
+    LOG.info("Incremental copy from " + incrBackupFileList + " to "
+        + backupContext.getHLogTargetDir() + " finished.");
+  }
+
+  @Override
+  protected Flow executeFromState(final MasterProcedureEnv env,
+      final IncrementalTableBackupState state)
+      throws InterruptedException {
+    if (conf == null) {
+      conf = env.getMasterConfiguration();
+    }
+    if (backupManager == null) {
+      try {
+        backupManager = new BackupManager(env.getMasterConfiguration());
+      } catch (IOException ioe) {
+        setFailure("incremental backup", ioe);
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+    try {
+      switch (state) {
+        case PREPARE_INCREMENTAL:
+          FullTableBackupProcedure.beginBackup(backupManager, backupContext);
+          LOG.debug("For incremental backup, current table set is "
+              + backupManager.getIncrementalBackupTableSet());
+          try {
+            IncrementalBackupManager incrBackupManager =new IncrementalBackupManager(backupManager);
+
+            newTimestamps = incrBackupManager.getIncrBackupLogFileList(backupContext);
+          } catch (Exception e) {
+            // fail the overall backup and return
+            FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
+              "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
+          }
+
+          setNextState(IncrementalTableBackupState.INCREMENTAL_COPY);
+          break;
+        case INCREMENTAL_COPY:
+          try {
+            // copy out the table and region info files for each table
+            BackupUtil.copyTableRegionInfo(backupContext, conf);
+            incrementalCopy(backupContext);
+            // Save list of WAL files copied
+            backupManager.recordWALFiles(backupContext.getIncrBackupFileList());
+          } catch (Exception e) {
+            // fail the overall backup and return
+            FullTableBackupProcedure.failBackup(env, backupContext, backupManager, e,
+              "Unexpected exception doing incremental copy : ", BackupType.INCREMENTAL, conf);
+          }
+          setNextState(IncrementalTableBackupState.INCR_BACKUP_COMPLETE);
+          break;
+        case INCR_BACKUP_COMPLETE:
+          // set overall backup status: complete. Here we make sure to complete the backup.
+          // After this checkpoint, even if entering cancel process, will let the backup finished
+          backupContext.setState(BackupState.COMPLETE);
+          // Set the previousTimestampMap which is before this current log roll to the manifest.
+          HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
+              backupManager.readLogTimestampMap();
+          backupContext.setIncrTimestampMap(previousTimestampMap);
+
+          // The table list in backupContext is good for both full backup and incremental backup.
+          // For incremental backup, it contains the incremental backup table set.
+          backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps);
+
+          HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+              backupManager.readLogTimestampMap();
+
+          Long newStartCode = BackupClientUtil
+              .getMinValue(BackupUtil.getRSLogTimestampMins(newTableSetTimestampMap));
+          backupManager.writeBackupStartCode(newStartCode);
+          // backup complete
+          FullTableBackupProcedure.completeBackup(env, backupContext, backupManager,
+            BackupType.INCREMENTAL, conf);
+          return Flow.NO_MORE_STATE;
+
+        default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    } catch (IOException e) {
+      setFailure("snapshot-table", e);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(final MasterProcedureEnv env,
+      final IncrementalTableBackupState state) throws IOException {
+    // clean up the uncompleted data at target directory if the ongoing backup has already entered
+    // the copy phase
+    // For incremental backup, DistCp logs will be cleaned with the targetDir.
+    FullTableBackupProcedure.cleanupTargetDir(backupContext, conf);
+  }
+
+  @Override
+  protected IncrementalTableBackupState getState(final int stateId) {
+    return IncrementalTableBackupState.valueOf(stateId);
+  }
+
+  @Override
+  protected int getStateId(final IncrementalTableBackupState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected IncrementalTableBackupState getInitialState() {
+    return IncrementalTableBackupState.PREPARE_INCREMENTAL;
+  }
+
+  @Override
+  protected void setNextState(final IncrementalTableBackupState state) {
+    if (aborted.get()) {
+      setAbortFailure("snapshot-table", "abort requested");
+    } else {
+      super.setNextState(state);
+    }
+  }
+
+  @Override
+  public boolean abort(final MasterProcedureEnv env) {
+    aborted.set(true);
+    return true;
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" (targetRootDir=");
+    sb.append(targetRootDir);
+    sb.append(")");
+  }
+
+  BackupProtos.BackupProcContext toBackupContext() {
+    BackupProtos.BackupProcContext.Builder ctxBuilder = BackupProtos.BackupProcContext.newBuilder();
+    ctxBuilder.setCtx(backupContext.toBackupContext());
+    if (newTimestamps != null && !newTimestamps.isEmpty()) {
+      BackupProtos.ServerTimestamp.Builder tsBuilder = ServerTimestamp.newBuilder();
+      for (Entry<String, Long> entry : newTimestamps.entrySet()) {
+        tsBuilder.clear().setServer(entry.getKey()).setTimestamp(entry.getValue());
+        ctxBuilder.addServerTimestamp(tsBuilder.build());
+      }
+    }
+    return ctxBuilder.build();
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException {
+    super.serializeStateData(stream);
+
+    BackupProtos.BackupProcContext backupProcCtx = toBackupContext();
+    backupProcCtx.writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws IOException {
+    super.deserializeStateData(stream);
+
+    BackupProtos.BackupProcContext proto =BackupProtos.BackupProcContext.parseDelimitedFrom(stream);
+    backupContext = BackupContext.fromProto(proto.getCtx());
+    backupId = backupContext.getBackupId();
+    targetRootDir = backupContext.getTargetRootDir();
+    tableList = backupContext.getTableNames();
+    List<ServerTimestamp> svrTimestamps = proto.getServerTimestampList();
+    if (svrTimestamps != null && !svrTimestamps.isEmpty()) {
+      newTimestamps = new HashMap<>();
+      for (ServerTimestamp ts : svrTimestamps) {
+        newTimestamps.put(ts.getServer(), ts.getTimestamp());
+      }
+    }
+  }
+
+  @Override
+  public TableName getTableName() {
+    return TableName.BACKUP_TABLE_NAME;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.BACKUP;
+  }
+
+  @Override
+  protected boolean acquireLock(final MasterProcedureEnv env) {
+    if (env.waitInitialized(this)) {
+      return false;
+    }
+    return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
index 6617565..c0c5220 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
@@ -286,8 +286,7 @@ public final class RestoreClientImpl implements RestoreClient {
     String backupId = image.getBackupId();
 
     Path rootPath = new Path(rootDir);
-    HBackupFileSystem hFS = new HBackupFileSystem(conf, rootPath, backupId);
-    RestoreUtil restoreTool = new RestoreUtil(conf, hFS);
+    RestoreUtil restoreTool = new RestoreUtil(conf, rootPath, backupId);
     BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, rootPath, backupId);
 
     Path tableBackupPath = HBackupFileSystem.getTableBackupPath(rootPath, sTable, backupId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
index 3882e95..9139b6e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
@@ -18,15 +18,20 @@
 
 package org.apache.hadoop.hbase.backup.impl;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
@@ -37,10 +42,17 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A collection for methods used by multiple classes to restore HBase tables.
@@ -51,22 +63,72 @@ public class RestoreUtil {
 
   public static final Log LOG = LogFactory.getLog(RestoreUtil.class);
 
-  protected Configuration conf = null;
+  private final String[] ignoreDirs = { "recovered.edits" };
 
-  protected HBackupFileSystem hBackupFS = null;
+  protected Configuration conf = null;
 
   protected Path backupRootPath;
 
   protected String backupId;
 
+  protected FileSystem fs;
+  private final String RESTORE_TMP_PATH = "/tmp";
+  private final Path restoreTmpPath;
+
   // store table name and snapshot dir mapping
   private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
 
-  public RestoreUtil(Configuration conf, HBackupFileSystem hBackupFS) throws IOException {
+  public RestoreUtil(Configuration conf, final Path backupRootPath, final String backupId)
+      throws IOException {
     this.conf = conf;
-    this.hBackupFS = hBackupFS;
-    this.backupRootPath = hBackupFS.getBackupRootPath();
-    this.backupId = hBackupFS.getBackupId();
+    this.backupRootPath = backupRootPath;
+    this.backupId = backupId;
+    this.fs = backupRootPath.getFileSystem(conf);
+    this.restoreTmpPath = new Path(conf.get("hbase.fs.tmp.dir") != null?
+        conf.get("hbase.fs.tmp.dir"): RESTORE_TMP_PATH,
+      "restore");
+  }
+
+  /**
+   * return value represent path for:
+   * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn"
+   * @param tabelName table name
+   * @return path to table archive
+   * @throws IOException exception
+   */
+  Path getTableArchivePath(TableName tableName)
+      throws IOException {
+    Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(backupRootPath, tableName,
+      backupId), HConstants.HFILE_ARCHIVE_DIRECTORY);
+    Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
+    Path archivePath = new Path(dataDir, tableName.getNamespaceAsString());
+    Path tableArchivePath =
+        new Path(archivePath, tableName.getQualifierAsString());
+    if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) {
+      LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists");
+      tableArchivePath = null; // empty table has no archive
+    }
+    return tableArchivePath;
+  }
+
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableArchivePath = this.getTableArchivePath(tableName);
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
   }
 
   /**
@@ -78,7 +140,7 @@ public class RestoreUtil {
    * @param newTableNames : target tableNames(table names to be restored to)
    * @throws IOException exception
    */
-  public void incrementalRestoreTable(String logDir,
+  void incrementalRestoreTable(String logDir,
       TableName[] tableNames, TableName[] newTableNames) throws IOException {
 
     if (tableNames.length != newTableNames.length) {
@@ -103,11 +165,112 @@ public class RestoreUtil {
     }
   }
 
-  public void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName,
+  void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName,
       boolean converted) throws IOException {
     restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted);
   }
 
+  /**
+   * return value represent path for:
+   * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot"
+   * @param backupRootPath backup root path
+   * @param tableName table name
+   * @param backupId backup Id
+   * @return path for snapshot
+   */
+  static Path getTableSnapshotPath(Path backupRootPath, TableName tableName,
+      String backupId) {
+    return new Path(HBackupFileSystem.getTableBackupPath(backupRootPath, tableName, backupId),
+      HConstants.SNAPSHOT_DIR_NAME);
+  }
+
+  /**
+   * return value represent path for:
+   * "..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn"
+   * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo,
+   * .data.manifest (trunk)
+   * @param tableName table name
+   * @return path to table info
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  Path getTableInfoPath(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
+    Path tableInfoPath = null;
+
+    // can't build the path directly as the timestamp values are different
+    FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
+    for (FileStatus snapshot : snapshots) {
+      tableInfoPath = snapshot.getPath();
+      // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
+      if (tableInfoPath.getName().endsWith("data.manifest")) {
+        break;
+      }
+    }
+    return tableInfoPath;
+  }
+
+  /**
+   * @param tableName is the table backed up
+   * @return {@link HTableDescriptor} saved in backup image of the table
+   */
+  HTableDescriptor getTableDesc(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableInfoPath = this.getTableInfoPath(tableName);
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
+    HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
+    if (!tableDescriptor.getNameAsString().equals(tableName)) {
+      LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: "
+          + tableInfoPath.toString());
+      LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString());
+    }
+    return tableDescriptor;
+  }
+
+  /**
+   * Duplicate the backup image if it's on local cluster
+   * @see HStore#bulkLoadHFile(String, long)
+   * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
+   * @param tableArchivePath archive path
+   * @return the new tableArchivePath
+   * @throws IOException exception
+   */
+  Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
+    // Move the file if it's on local cluster
+    boolean isCopyNeeded = false;
+
+    FileSystem srcFs = tableArchivePath.getFileSystem(conf);
+    FileSystem desFs = FileSystem.get(conf);
+    if (tableArchivePath.getName().startsWith("/")) {
+      isCopyNeeded = true;
+    } else {
+      // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
+      // long)
+      if (srcFs.getUri().equals(desFs.getUri())) {
+        LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
+            + desFs.getUri());
+        isCopyNeeded = true;
+      }
+    }
+    if (isCopyNeeded) {
+      LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
+      if (desFs.exists(restoreTmpPath)) {
+        try {
+          desFs.delete(restoreTmpPath, true);
+        } catch (IOException e) {
+          LOG.debug("Failed to delete path: " + restoreTmpPath
+            + ", need to check whether restore target DFS cluster is healthy");
+        }
+      }
+      FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
+      LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
+      tableArchivePath = restoreTmpPath;
+    }
+    return tableArchivePath;
+  }
+
   private void restoreTableAndCreate(TableName tableName, TableName newTableName,
       Path tableBackupPath, boolean converted) throws IOException {
     if (newTableName == null || newTableName.equals("")) {
@@ -119,8 +282,7 @@ public class RestoreUtil {
     // get table descriptor first
     HTableDescriptor tableDescriptor = null;
 
-    Path tableSnapshotPath = HBackupFileSystem.getTableSnapshotPath(backupRootPath, tableName,
-      backupId);
+    Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
 
     if (fileSys.exists(tableSnapshotPath)) {
       // snapshot path exist means the backup path is in HDFS
@@ -130,11 +292,9 @@ public class RestoreUtil {
             SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath);
         SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc);
         tableDescriptor = manifest.getTableDescriptor();
-
-
       } else {
-        tableDescriptor = hBackupFS.getTableDesc(tableName);
-        snapshotMap.put(tableName, hBackupFS.getTableInfoPath(tableName));
+        tableDescriptor = getTableDesc(tableName);
+        snapshotMap.put(tableName, getTableInfoPath(tableName));
       }
       if (tableDescriptor == null) {
         LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");
@@ -144,7 +304,7 @@ public class RestoreUtil {
       LOG.error("convert will be supported in a future jira");
     }
 
-    Path tableArchivePath = hBackupFS.getTableArchivePath(tableName);
+    Path tableArchivePath = getTableArchivePath(tableName);
     if (tableArchivePath == null) {
       if (tableDescriptor != null) {
         // find table descriptor but no archive dir means the table is empty, create table and exit
@@ -171,7 +331,7 @@ public class RestoreUtil {
       // record all region dirs:
       // load all files in dir
       try {
-        ArrayList<Path> regionPathList = hBackupFS.getRegionList(tableName);
+        ArrayList<Path> regionPathList = getRegionList(tableName);
 
         // should only try to create the table with all region informations, so we could pre-split
         // the regions in fine grain
@@ -180,13 +340,13 @@ public class RestoreUtil {
         if (tableArchivePath != null) {
           // start real restore through bulkload
           // if the backup target is on local cluster, special action needed
-          Path tempTableArchivePath = hBackupFS.checkLocalAndBackup(tableArchivePath);
+          Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
           if (tempTableArchivePath.equals(tableArchivePath)) {
             if(LOG.isDebugEnabled()) {
               LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
             }
           } else {
-            regionPathList = hBackupFS.getRegionList(tempTableArchivePath); // point to the tempDir
+            regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
             if(LOG.isDebugEnabled()) {
               LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
             }
@@ -211,7 +371,72 @@ public class RestoreUtil {
     }
   }
 
+  /**
+   * Gets region list
+   * @param tableArchivePath table archive path
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException,
+  IOException {
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
 
+  /**
+   * Counts the number of files in all subdirectories of an HBase table, i.e. HFiles.
+   * @param regionPath Path to an HBase table directory
+   * @return the number of files all directories
+   * @throws IOException exception
+   */
+  int getNumberOfFilesInDir(Path regionPath) throws IOException {
+    int result = 0;
+
+    if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) {
+      throw new IllegalStateException("Cannot restore hbase table because directory '"
+          + regionPath.toString() + "' is not a directory.");
+    }
+
+    FileStatus[] tableDirContent = fs.listStatus(regionPath);
+    for (FileStatus subDirStatus : tableDirContent) {
+      FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath());
+      for (FileStatus colFamilyStatus : colFamilies) {
+        FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath());
+        result += colFamilyContent.length;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Counts the number of files in all subdirectories of an HBase tables, i.e. HFiles. And finds the
+   * maximum number of files in one HBase table.
+   * @param tableArchivePath archive path
+   * @return the maximum number of files found in 1 HBase table
+   * @throws IOException exception
+   */
+  int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException {
+    int result = 1;
+    ArrayList<Path> regionPathList = getRegionList(tableArchivePath);
+    // tableArchivePath = this.getTableArchivePath(tableName);
+
+    if (regionPathList == null || regionPathList.size() == 0) {
+      throw new IllegalStateException("Cannot restore hbase table because directory '"
+          + tableArchivePath + "' is not a directory.");
+    }
+
+    for (Path regionPath : regionPathList) {
+      result = Math.max(result, getNumberOfFilesInDir(regionPath));
+    }
+    return result;
+  }
 
   /**
    * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
@@ -228,8 +453,8 @@ public class RestoreUtil {
     Integer milliSecInMin = 60000;
     Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0);
     Integer numberOfFilesInDir =
-        multipleTables ? hBackupFS.getMaxNumberOfFilesInSubDir(tableArchivePath) : hBackupFS
-            .getNumberOfFilesInDir(tableArchivePath);
+        multipleTables ? getMaxNumberOfFilesInSubDir(tableArchivePath) :
+            getNumberOfFilesInDir(tableArchivePath);
     Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file
     Integer resultMillis = Math.max(calculatedMillis, previousMillis);
     if (resultMillis > previousMillis) {
@@ -249,6 +474,86 @@ public class RestoreUtil {
   }
 
   /**
+   * Calculate region boundaries and add all the column families to the table descriptor
+   * @param regionDirList region dir list
+   * @return a set of keys to store the boundaries
+   */
+  byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList)
+      throws FileNotFoundException, IOException {
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    // Build a set of keys to store the boundaries
+    byte[][] keys = null;
+    // calculate region boundaries and add all the column families to the table descriptor
+    for (Path regionDir : regionDirList) {
+      LOG.debug("Parsing region dir: " + regionDir);
+      Path hfofDir = regionDir;
+
+      if (!fs.exists(hfofDir)) {
+        LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
+      }
+
+      FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+      if (familyDirStatuses == null) {
+        throw new IOException("No families found in " + hfofDir);
+      }
+
+      for (FileStatus stat : familyDirStatuses) {
+        if (!stat.isDirectory()) {
+          LOG.warn("Skipping non-directory " + stat.getPath());
+          continue;
+        }
+        boolean isIgnore = false;
+        String pathName = stat.getPath().getName();
+        for (String ignore : ignoreDirs) {
+          if (pathName.contains(ignore)) {
+            LOG.warn("Skipping non-family directory" + pathName);
+            isIgnore = true;
+            break;
+          }
+        }
+        if (isIgnore) {
+          continue;
+        }
+        Path familyDir = stat.getPath();
+        LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]");
+        // Skip _logs, etc
+        if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) {
+          continue;
+        }
+
+        // start to parse hfile inside one family dir
+        Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+        for (Path hfile : hfiles) {
+          if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".")
+              || StoreFileInfo.isReference(hfile.getName())
+              || HFileLink.isHFileLink(hfile.getName())) {
+            continue;
+          }
+          HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(conf), conf);
+          final byte[] first, last;
+          try {
+            reader.loadFileInfo();
+            first = reader.getFirstRowKey();
+            last = reader.getLastRowKey();
+            LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first="
+                + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+            // To eventually infer start key-end key boundaries
+            Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0;
+            map.put(first, value + 1);
+            value = map.containsKey(last) ? (Integer) map.get(last) : 0;
+            map.put(last, value - 1);
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    return keys;
+  }
+
+  /**
    * Prepare the table for bulkload, most codes copied from
    * {@link LoadIncrementalHFiles#createTable(String, String)}
    * @param tableBackupPath path
@@ -278,7 +583,7 @@ public class RestoreUtil {
           return;
         }
 
-        byte[][] keys = hBackupFS.generateBoundaryKeys(regionDirList);
+        byte[][] keys = generateBoundaryKeys(regionDirList);
 
         // create table using table decriptor and region boundaries
         hbadmin.createTable(htd, keys);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
index 14235ce..fdd8272 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupContext;
 import org.apache.hadoop.hbase.backup.impl.BackupCopyService;
-import org.apache.hadoop.hbase.backup.impl.BackupHandler;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.impl.BackupUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -42,6 +41,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 /**
  * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
  * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
@@ -115,6 +115,25 @@ public class MapReduceBackupCopyService implements BackupCopyService {
     }
   }
 
+  /**
+   * Update the ongoing back token znode with new progress.
+   * @param backupContext backup context
+   * 
+   * @param newProgress progress
+   * @param bytesCopied bytes copied
+   * @throws NoNodeException exception
+   */
+  static void updateProgress(BackupContext backupContext, BackupManager backupManager,
+      int newProgress, long bytesCopied) throws IOException {
+    // compose the new backup progress data, using fake number for now
+    String backupProgressData = newProgress + "%";
+
+    backupContext.setProgress(newProgress);
+    backupManager.updateBackupStatus(backupContext);
+    LOG.debug("Backup progress data \"" + backupProgressData
+      + "\" has been updated to hbase:backup for " + backupContext.getBackupId());
+  }
+
   // Extends DistCp for progress updating to hbase:backup
   // during backup. Using DistCpV2 (MAPREDUCE-2765).
   // Simply extend it and override execute() method to get the
@@ -203,7 +222,7 @@ public class MapReduceBackupCopyService implements BackupCopyService {
                 new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
             String newProgressStr = progressData + "%";
             LOG.info("Progress: " + newProgressStr);
-            BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+            updateProgress(backupContext, backupManager, progressData.intValue(),
               bytesCopied);
             LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
               + ".\"");
@@ -225,7 +244,7 @@ public class MapReduceBackupCopyService implements BackupCopyService {
         progressDone = newProgress;
         bytesCopied += totalSrcLgth;
 
-        BackupHandler.updateProgress(backupContext, backupManager, progressData.intValue(),
+        updateProgress(backupContext, backupManager, progressData.intValue(),
           bytesCopied);
         LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
           + " - " + bytesCopied + " bytes copied.\"");

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5bba1ea..0d5ed1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -49,6 +49,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.CoordinatedStateException;
@@ -75,7 +76,14 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTableHelper;
+import org.apache.hadoop.hbase.backup.impl.FullTableBackupProcedure;
+import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupProcedure;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -142,6 +150,7 @@ import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.HasThread;
@@ -167,6 +176,9 @@ import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.Context;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Service;
@@ -2601,6 +2613,68 @@ public class HMaster extends HRegionServer implements MasterServices {
     return procInfoList;
   }
 
+  @Override
+  public Pair<Long, String> backupTables(final BackupType type,
+        List<TableName> tableList, final String targetRootDir, final int workers,
+        final long bandwidth) throws IOException {
+    long procId;
+    String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
+    if (type == BackupType.INCREMENTAL) {
+      Set<TableName> incrTableSet =
+          BackupSystemTableHelper.getIncrementalBackupTableSet(clusterConnection);
+      if (incrTableSet.isEmpty()) {
+        LOG.warn("Incremental backup table set contains no table.\n"
+            + "Use 'backup create full' or 'backup stop' to \n "
+            + "change the tables covered by incremental backup.");
+        throw new DoNotRetryIOException("No table covered by incremental backup.");
+      }
+
+      LOG.info("Incremental backup for the following table set: " + incrTableSet);
+      tableList = Lists.newArrayList(incrTableSet);
+    }
+    if (tableList != null && !tableList.isEmpty()) {
+      for (TableName table : tableList) {
+        String targetTableBackupDir =
+            HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table);
+        Path targetTableBackupDirPath = new Path(targetTableBackupDir);
+        FileSystem outputFs = FileSystem.get(targetTableBackupDirPath.toUri(), conf);
+        if (outputFs.exists(targetTableBackupDirPath)) {
+          throw new DoNotRetryIOException("Target backup directory " + targetTableBackupDir
+            + " exists already.");
+        }
+      }
+      ArrayList<TableName> nonExistingTableList = null;
+      for (TableName tableName : tableList) {
+        if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
+          if (nonExistingTableList == null) {
+            nonExistingTableList = new ArrayList<>();
+          }
+          nonExistingTableList.add(tableName);
+        }
+      }
+      if (nonExistingTableList != null) {
+        if (type == BackupType.INCREMENTAL ) {
+          LOG.warn("Incremental backup table set contains non-exising table: "
+              + nonExistingTableList);
+        } else {
+          // Throw exception only in full mode - we try to backup non-existing table
+          throw new DoNotRetryIOException("Non-existing tables found in the table list: "
+              + nonExistingTableList);
+        }
+      }
+    }
+    if (type == BackupType.FULL) {
+      procId = this.procedureExecutor.submitProcedure(
+        new FullTableBackupProcedure(procedureExecutor.getEnvironment(), backupId,
+          tableList, targetRootDir, workers, bandwidth));
+    } else {
+      procId = this.procedureExecutor.submitProcedure(
+        new IncrementalTableBackupProcedure(procedureExecutor.getEnvironment(), backupId,
+          tableList, targetRootDir, workers, bandwidth));
+    }
+    return new Pair<>(procId, backupId);
+  }
+
   /**
    * Returns the list of table descriptors that match the specified request
    * @param namespace the namespace to query, or null if querying for all

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 8b036d9..e3004bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.TableState;
@@ -1052,6 +1053,25 @@ public class MasterRpcServices extends RSRpcServices
   }
 
   @Override
+  public MasterProtos.BackupTablesResponse backupTables(
+      RpcController controller,
+      MasterProtos.BackupTablesRequest request)  throws ServiceException {
+    try {
+      BackupTablesResponse.Builder response = BackupTablesResponse.newBuilder();
+      List<TableName> tablesList = new ArrayList<>(request.getTablesList().size());
+      for (HBaseProtos.TableName table : request.getTablesList()) {
+        tablesList.add(ProtobufUtil.toTableName(table));
+      }
+      Pair<Long, String> pair = master.backupTables(
+        BackupType.valueOf(request.getType().name()), tablesList, request.getTargetRootDir(),
+        (int)request.getWorkers(), request.getBandwidth());
+      return response.setProcId(pair.getFirst()).setBackupId(pair.getSecond()).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(RpcController c,
       ListTableDescriptorsByNamespaceRequest request) throws ServiceException {
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 867faed..3557bb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.Service;
 
@@ -179,6 +182,23 @@ public interface MasterServices extends Server {
       throws IOException;
 
   /**
+   * Full backup given list of tables
+   * @param type whether the backup is full or incremental
+   * @param tableList list of tables to backup
+   * @param targetRootDir root dir for saving the backup
+   * @param workers number of paralle workers. -1 - system defined
+   * @param bandwidth bandwidth per worker in MB per sec. -1 - unlimited
+   * @return pair of procedure Id and backupId
+   * @throws IOException
+   */
+  public Pair<Long, String> backupTables(
+      final BackupType type,
+      List<TableName> tableList,
+      final String targetRootDir,
+      final int workers,
+      final long bandwidth) throws IOException;
+
+  /**
    * Enable an existing table
    * @param tableName The table name
    * @param nonceGroup

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index d7c0b92..e2409d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -18,11 +18,22 @@
 
 package org.apache.hadoop.hbase.master.procedure;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.security.UserGroupInformation;
 
 @InterfaceAudience.Private
@@ -53,4 +64,62 @@ public final class MasterProcedureUtil {
     }
     return null;
   }
+
+  public static ProcedureDescription buildProcedure(String signature, String instance,
+      Map<String, String> props) {
+    ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
+    builder.setSignature(signature).setInstance(instance);
+    for (Entry<String, String> entry : props.entrySet()) {
+      NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
+          .setValue(entry.getValue()).build();
+      builder.addConfiguration(pair);
+    }
+    ProcedureDescription desc = builder.build();
+    return desc;
+  }
+
+  public static long execProcedure(MasterProcedureManager mpm, String signature, String instance,
+      Map<String, String> props) throws IOException {
+    if (mpm == null) {
+      throw new IOException("The procedure is not registered: " + signature);
+    }
+    ProcedureDescription desc = buildProcedure(signature, instance, props);
+    mpm.execProcedure(desc);
+
+    // send back the max amount of time the client should wait for the procedure
+    // to complete
+    long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
+    return waitTime;
+  }
+  
+  public static void waitForProcedure(MasterProcedureManager mpm, String signature, String instance,
+      Map<String, String> props, long max, int numRetries, long pause) throws IOException {
+    ProcedureDescription desc = buildProcedure(signature, instance, props);
+    long start = EnvironmentEdgeManager.currentTime();
+    long maxPauseTime = max / numRetries;
+    int tries = 0;
+    LOG.debug("Waiting a max of " + max + " ms for procedure '" +
+        signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
+    boolean done = false;
+    while (tries == 0
+        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
+      try {
+        // sleep a backoff <= pauseTime amount
+        long sleep = HBaseAdmin.getPauseTime(tries++, pause);
+        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
+        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
+          "ms while waiting for procedure completion.");
+        Thread.sleep(sleep);
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new InterruptedIOException("Interrupted").initCause(e);
+      }
+      LOG.debug("Getting current status of procedure from master...");
+      done = mpm.isProcedureDone(desc);
+    }
+    if (!done) {
+      throw new IOException("Procedure '" + signature + " : " + instance
+          + "' wasn't completed in expectedTime:" + max + " ms");
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
index cc088f3..56983a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface TableProcedureInterface {
   public enum TableOperationType {
-    CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
+    CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, BACKUP,
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/b37cc760/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 84b7c78..1416523 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -31,9 +31,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupContext.BackupState;
 import org.apache.hadoop.hbase.backup.impl.BackupContext;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil;
 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
 import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
 import org.apache.hadoop.hbase.client.Connection;
@@ -47,6 +48,8 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import com.google.common.collect.Lists;
+
 /**
  * This class is only a base for other integration-level backup tests. Do not add tests here.
  * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
@@ -126,6 +129,32 @@ public class TestBackupBase {
     TEST_UTIL.shutdownMiniMapReduceCluster();
   }
 
+  protected String backupTables(BackupType type, List<TableName> tables, String path)
+      throws IOException {
+    Connection conn = null;
+    HBaseAdmin admin = null;
+    String backupId;
+    try {
+      conn = ConnectionFactory.createConnection(conf1);
+      admin = (HBaseAdmin) conn.getAdmin();
+      BackupRequest request = new BackupRequest();
+      request.setBackupType(type).setTableList(tables).setTargetRootDir(path);
+      backupId = admin.backupTables(request);
+    } finally {
+      if (admin != null) {
+        admin.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+    return backupId;
+  }
+
+  protected String fullTableBackup(List<TableName> tables) throws IOException {
+    return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
+  }
+
   protected static void loadTable(HTable table) throws Exception {
 
     Put p; // 100 + 1 row to t1_syncup
@@ -140,6 +169,7 @@ public class TestBackupBase {
 
     long tid = System.currentTimeMillis();
     table1 = TableName.valueOf("test-" + tid);
+    BackupSystemTable backupTable = new BackupSystemTable(TEST_UTIL.getConnection());
     HBaseAdmin ha = TEST_UTIL.getHBaseAdmin();
     HTableDescriptor desc = new HTableDescriptor(table1);
     HColumnDescriptor fam = new HColumnDescriptor(famName);
@@ -187,10 +217,6 @@ public class TestBackupBase {
     }
   }
 
-  protected BackupClient getBackupClient(){
-    return BackupRestoreFactory.getBackupClient(conf1);
-  }
-
   protected RestoreClient getRestoreClient()
   {
     return BackupRestoreFactory.getRestoreClient(conf1);


Mime
View raw message