hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject [06/11] hbase git commit: HBASE-14123 HBase Backup/Restore Phase 2 (Vladimir Rodionov)
Date Mon, 14 Nov 2016 17:22:52 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java
new file mode 100644
index 0000000..57596c8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/HBaseBackupAdmin.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupAdmin;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.backup.util.BackupSet;
+import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.collect.Lists;
+
+/**
+ * The administrative API implementation for HBase Backup . Create an instance from
+ * {@link HBaseBackupAdmin#HBaseBackupAdmin(Connection)} and call {@link #close()} afterwards.
+ * <p>BackupAdmin can be used to create backups, restore data from backups and for
+ * other backup-related operations.
+ *
+ * @see Admin
+ * @since 2.0
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+public class HBaseBackupAdmin implements BackupAdmin {
+  private static final Log LOG = LogFactory.getLog(HBaseBackupAdmin.class);
+
+  private final Connection conn;
+
+  public HBaseBackupAdmin(Connection conn) {
+    this.conn = conn;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public BackupInfo getBackupInfo(String backupId) throws IOException {
+    BackupInfo backupInfo = null;
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      backupInfo = table.readBackupInfo(backupId);
+      return backupInfo;
+    }
+  }
+
+  @Override
+  public int getProgress(String backupId) throws IOException {
+    BackupInfo backupInfo = null;
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      if (backupId == null) {
+        ArrayList<BackupInfo> recentSessions = table.getBackupContexts(BackupState.RUNNING);
+        if (recentSessions.isEmpty()) {
+          LOG.warn("No ongoing sessions found.");
+          return -1;
+        }
+        // else show status for ongoing session
+        // must be one maximum
+        return recentSessions.get(0).getProgress();
+      } else {
+
+        backupInfo = table.readBackupInfo(backupId);
+        if (backupInfo != null) {
+          return backupInfo.getProgress();
+        } else {
+          LOG.warn("No information found for backupID=" + backupId);
+          return -1;
+        }
+      }
+    }
+  }
+
+  @Override
+  public int deleteBackups(String[] backupIds) throws IOException {
+    // TODO: requires FT, failure will leave system
+    // in non-consistent state
+    // see HBASE-15227
+
+    int totalDeleted = 0;
+    Map<String, HashSet<TableName>> allTablesMap = new HashMap<String, HashSet<TableName>>();
+
+    try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
+      for (int i = 0; i < backupIds.length; i++) {
+        BackupInfo info = sysTable.readBackupInfo(backupIds[i]);
+        if (info != null) {
+          String rootDir = info.getTargetRootDir();
+          HashSet<TableName> allTables = allTablesMap.get(rootDir);
+          if (allTables == null) {
+            allTables = new HashSet<TableName>();
+            allTablesMap.put(rootDir, allTables);
+          }
+          allTables.addAll(info.getTableNames());
+          totalDeleted += deleteBackup(backupIds[i], sysTable);
+        }
+      }
+      finalizeDelete(allTablesMap, sysTable);
+    }
+    return totalDeleted;
+  }
+
+  /**
+   * Updates incremental backup set for every backupRoot
+   * @param tablesMap - Map [backupRoot: Set<TableName>]
+   * @param table - backup system table
+   * @throws IOException
+   */
+
+  private void finalizeDelete(Map<String, HashSet<TableName>> tablesMap, BackupSystemTable table)
+      throws IOException {
+    for (String backupRoot : tablesMap.keySet()) {
+      Set<TableName> incrTableSet = table.getIncrementalBackupTableSet(backupRoot);
+      Map<TableName, ArrayList<BackupInfo>> tableMap =
+          table.getBackupHistoryForTableSet(incrTableSet, backupRoot);
+      for(Map.Entry<TableName, ArrayList<BackupInfo>> entry: tableMap.entrySet()) {
+        if(entry.getValue() == null) {
+          // No more backups for a table
+          incrTableSet.remove(entry.getKey());
+        }
+      }
+      if (!incrTableSet.isEmpty()) {
+        table.addIncrementalBackupTableSet(incrTableSet, backupRoot);
+      } else { // empty
+        table.deleteIncrementalBackupTableSet(backupRoot);
+      }
+    }
+  }
+
+  /**
+   * Delete single backup and all related backups
+   * Algorithm:
+   *
+   * Backup type: FULL or INCREMENTAL
+   * Is this last backup session for table T: YES or NO
+   * For every table T from table list 'tables':
+   * if(FULL, YES) deletes only physical data (PD)
+   * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo, until
+   * we either reach the most recent backup for T in the system or FULL backup which
+   * includes T
+   * if(INCREMENTAL, YES) deletes only physical data (PD)
+   * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images
+   * between last FULL backup, which is older than the backup being deleted and the next
+   * FULL backup (if exists) or last one for a particular table T and removes T from list
+   * of backup tables.
+   * @param backupId - backup id
+   * @param sysTable - backup system table
+   * @return total - number of deleted backup images
+   * @throws IOException
+   */
+  private int deleteBackup(String backupId, BackupSystemTable sysTable) throws IOException {
+
+    BackupInfo backupInfo = sysTable.readBackupInfo(backupId);
+
+    int totalDeleted = 0;
+    if (backupInfo != null) {
+      LOG.info("Deleting backup " + backupInfo.getBackupId() + " ...");
+      BackupClientUtil.cleanupBackupData(backupInfo, conn.getConfiguration());
+      // List of tables in this backup;
+      List<TableName> tables = backupInfo.getTableNames();
+      long startTime = backupInfo.getStartTs();
+      for (TableName tn : tables) {
+        boolean isLastBackupSession = isLastBackupSession(sysTable, tn, startTime);
+        if (isLastBackupSession) {
+          continue;
+        }
+        // else
+        List<BackupInfo> affectedBackups = getAffectedBackupInfos(backupInfo, tn, sysTable);
+        for (BackupInfo info : affectedBackups) {
+          if (info.equals(backupInfo)) {
+            continue;
+          }
+          removeTableFromBackupImage(info, tn, sysTable);
+        }
+      }
+      LOG.debug("Delete backup info "+ backupInfo.getBackupId());
+
+      sysTable.deleteBackupInfo(backupInfo.getBackupId());
+      LOG.info("Delete backup " + backupInfo.getBackupId() + " completed.");
+      totalDeleted++;
+    } else {
+      LOG.warn("Delete backup failed: no information found for backupID=" + backupId);
+    }
+    return totalDeleted;
+  }
+
+  private void removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable)
+      throws IOException {
+    List<TableName> tables = info.getTableNames();
+    LOG.debug("Remove "+ tn +" from " + info.getBackupId() + " tables=" +
+      info.getTableListAsString());
+    if (tables.contains(tn)) {
+      tables.remove(tn);
+
+      if (tables.isEmpty()) {
+        LOG.debug("Delete backup info "+ info.getBackupId());
+
+        sysTable.deleteBackupInfo(info.getBackupId());
+        BackupClientUtil.cleanupBackupData(info, conn.getConfiguration());
+      } else {
+        info.setTables(tables);
+        sysTable.updateBackupInfo(info);
+        // Now, clean up directory for table
+        cleanupBackupDir(info, tn, conn.getConfiguration());
+      }
+    }
+  }
+
+  private List<BackupInfo> getAffectedBackupInfos(BackupInfo backupInfo, TableName tn,
+      BackupSystemTable table) throws IOException {
+    LOG.debug("GetAffectedBackupInfos for: " + backupInfo.getBackupId() + " table=" + tn);
+    long ts = backupInfo.getStartTs();
+    List<BackupInfo> list = new ArrayList<BackupInfo>();
+    List<BackupInfo> history = table.getBackupHistory(backupInfo.getTargetRootDir());
+    // Scan from most recent to backupInfo
+    // break when backupInfo reached
+    for (BackupInfo info : history) {
+      if (info.getStartTs() == ts) {
+        break;
+      }
+      List<TableName> tables = info.getTableNames();
+      if (tables.contains(tn)) {
+        BackupType bt = info.getType();
+        if (bt == BackupType.FULL) {
+          // Clear list if we encounter FULL backup
+          list.clear();
+        } else {
+          LOG.debug("GetAffectedBackupInfos for: " + backupInfo.getBackupId() + " table=" + tn
+              + " added " + info.getBackupId() + " tables=" + info.getTableListAsString());
+          list.add(info);
+        }
+      }
+    }
+    return list;
+  }
+
+
+
+  /**
+   * Clean up the data at target directory
+   * @throws IOException
+   */
+  private void cleanupBackupDir(BackupInfo backupInfo, TableName table, Configuration conf)
+      throws IOException {
+    try {
+      // clean up the data at target directory
+      String targetDir = backupInfo.getTargetRootDir();
+      if (targetDir == null) {
+        LOG.warn("No target directory specified for " + backupInfo.getBackupId());
+        return;
+      }
+
+      FileSystem outputFs = FileSystem.get(new Path(backupInfo.getTargetRootDir()).toUri(), conf);
+
+      Path targetDirPath =
+          new Path(BackupClientUtil.getTableBackupDir(backupInfo.getTargetRootDir(),
+            backupInfo.getBackupId(), table));
+      if (outputFs.delete(targetDirPath, true)) {
+        LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done.");
+      } else {
+        LOG.info("No data has been found in " + targetDirPath.toString() + ".");
+      }
+
+    } catch (IOException e1) {
+      LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " for table " + table
+          + "at " + backupInfo.getTargetRootDir() + " failed due to " + e1.getMessage() + ".");
+      throw e1;
+    }
+  }
+
+  private boolean isLastBackupSession(BackupSystemTable table, TableName tn, long startTime)
+      throws IOException {
+    List<BackupInfo> history = table.getBackupHistory();
+    for (BackupInfo info : history) {
+      List<TableName> tables = info.getTableNames();
+      if (!tables.contains(tn)) {
+        continue;
+      }
+      if (info.getStartTs() <= startTime) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public List<BackupInfo> getHistory(int n) throws IOException {
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<BackupInfo> history = table.getBackupHistory();
+      if (history.size() <= n) return history;
+      List<BackupInfo> list = new ArrayList<BackupInfo>();
+      for (int i = 0; i < n; i++) {
+        list.add(history.get(i));
+      }
+      return list;
+    }
+  }
+
+  @Override
+  public List<BackupInfo> getHistory(int n, BackupInfo.Filter ... filters) throws IOException {
+    if (filters.length == 0) return getHistory(n);
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<BackupInfo> history = table.getBackupHistory();
+      List<BackupInfo> result = new ArrayList<BackupInfo>();
+      for(BackupInfo bi: history) {
+        if(result.size() == n) break;
+        boolean passed = true;
+        for(int i=0; i < filters.length; i++) {
+          if(!filters[i].apply(bi)) {
+            passed = false;
+            break;
+          }
+        }
+        if(passed) {
+          result.add(bi);
+        }
+      }
+      return result;
+    }
+  }
+
+  @Override
+  public List<BackupSet> listBackupSets() throws IOException {
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<String> list = table.listBackupSets();
+      List<BackupSet> bslist = new ArrayList<BackupSet>();
+      for (String s : list) {
+        List<TableName> tables = table.describeBackupSet(s);
+        if (tables != null) {
+          bslist.add(new BackupSet(s, tables));
+        }
+      }
+      return bslist;
+    }
+  }
+
+  @Override
+  public BackupSet getBackupSet(String name) throws IOException {
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      List<TableName> list = table.describeBackupSet(name);
+      if (list == null) return null;
+      return new BackupSet(name, list);
+    }
+  }
+
+  @Override
+  public boolean deleteBackupSet(String name) throws IOException {
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      if (table.describeBackupSet(name) == null) {
+        return false;
+      }
+      table.deleteBackupSet(name);
+      return true;
+    }
+  }
+
+  @Override
+  public void addToBackupSet(String name, TableName[] tables) throws IOException {
+    String[] tableNames = new String[tables.length];
+    try (final BackupSystemTable table = new BackupSystemTable(conn);
+         final Admin admin = conn.getAdmin();) {
+      for (int i = 0; i < tables.length; i++) {
+        tableNames[i] = tables[i].getNameAsString();
+        if (!admin.tableExists(TableName.valueOf(tableNames[i]))) {
+          throw new IOException("Cannot add " + tableNames[i] + " because it doesn't exist");
+        }
+      }
+      table.addToBackupSet(name, tableNames);
+      LOG.info("Added tables [" + StringUtils.join(tableNames, " ") + "] to '" + name
+          + "' backup set");
+    }
+  }
+
+  @Override
+  public void removeFromBackupSet(String name, String[] tables) throws IOException {
+    LOG.info("Removing tables [" + StringUtils.join(tables, " ") + "] from '" + name + "'");
+    try (final BackupSystemTable table = new BackupSystemTable(conn)) {
+      table.removeFromBackupSet(name, tables);
+      LOG.info("Removing tables [" + StringUtils.join(tables, " ") + "] from '" + name
+          + "' completed.");
+    }
+  }
+
+  @Override
+  public void restore(RestoreRequest request) throws IOException {
+    if (request.isCheck()) {
+      HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+      // check and load backup image manifest for the tables
+      Path rootPath = new Path(request.getBackupRootDir());
+      String backupId = request.getBackupId();
+      TableName[] sTableArray = request.getFromTables();
+      HBackupFileSystem.checkImageManifestExist(backupManifestMap,
+        sTableArray, conn.getConfiguration(), rootPath, backupId);
+
+      // Check and validate the backup image and its dependencies
+
+        if (RestoreServerUtil.validate(backupManifestMap, conn.getConfiguration())) {
+          LOG.info("Checking backup images: ok");
+        } else {
+          String errMsg = "Some dependencies are missing for restore";
+          LOG.error(errMsg);
+          throw new IOException(errMsg);
+        }
+
+    }
+    // Execute restore request
+    new RestoreTablesClient(conn, request).execute();
+  }
+
+  @Override
+  public Future<Void> restoreAsync(RestoreRequest request) throws IOException {
+    // TBI
+    return null;
+  }
+
+  @Override
+  public String backupTables(final BackupRequest request) throws IOException {
+    String setName = request.getBackupSetName();
+    BackupType type = request.getBackupType();
+    String targetRootDir = request.getTargetRootDir();
+    List<TableName> tableList = request.getTableList();
+
+    String backupId =
+        (setName == null || setName.length() == 0 ? BackupRestoreConstants.BACKUPID_PREFIX
+            : setName + "_") + EnvironmentEdgeManager.currentTime();
+    if (type == BackupType.INCREMENTAL) {
+      Set<TableName> incrTableSet = null;
+      try (BackupSystemTable table = new BackupSystemTable(conn)) {
+        incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+      }
+
+      if (incrTableSet.isEmpty()) {
+        System.err.println("Incremental backup table set contains no table.\n"
+            + "Use 'backup create full' or 'backup stop' to \n "
+            + "change the tables covered by incremental backup.");
+        throw new IOException("No table covered by incremental backup.");
+      }
+
+      tableList.removeAll(incrTableSet);
+      if (!tableList.isEmpty()) {
+        String extraTables = StringUtils.join(tableList, ",");
+        System.err.println("Some tables (" + extraTables + ") haven't gone through full backup");
+        throw new IOException("Perform full backup on " + extraTables + " first, "
+            + "then retry the command");
+      }
+      System.out.println("Incremental backup for the following table set: " + incrTableSet);
+      tableList = Lists.newArrayList(incrTableSet);
+    }
+    if (tableList != null && !tableList.isEmpty()) {
+      for (TableName table : tableList) {
+        String targetTableBackupDir =
+            HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table);
+        Path targetTableBackupDirPath = new Path(targetTableBackupDir);
+        FileSystem outputFs =
+            FileSystem.get(targetTableBackupDirPath.toUri(), conn.getConfiguration());
+        if (outputFs.exists(targetTableBackupDirPath)) {
+          throw new IOException("Target backup directory " + targetTableBackupDir
+              + " exists already.");
+        }
+      }
+      ArrayList<TableName> nonExistingTableList = null;
+      try (Admin admin = conn.getAdmin();) {
+        for (TableName tableName : tableList) {
+          if (!admin.tableExists(tableName)) {
+            if (nonExistingTableList == null) {
+              nonExistingTableList = new ArrayList<>();
+            }
+            nonExistingTableList.add(tableName);
+          }
+        }
+      }
+      if (nonExistingTableList != null) {
+        if (type == BackupType.INCREMENTAL) {
+          System.err.println("Incremental backup table set contains non-exising table: "
+              + nonExistingTableList);
+          // Update incremental backup set
+          tableList = excludeNonExistingTables(tableList, nonExistingTableList);
+        } else {
+          // Throw exception only in full mode - we try to backup non-existing table
+          throw new IOException("Non-existing tables found in the table list: "
+              + nonExistingTableList);
+        }
+      }
+    }
+
+    // update table list
+    request.setTableList(tableList);
+
+    if (type == BackupType.FULL) {
+      new FullTableBackupClient(conn, backupId, request).execute();
+    } else {
+      new IncrementalTableBackupClient(conn, backupId, request).execute();
+    }
+    return backupId;
+  }
+
+
+  private List<TableName> excludeNonExistingTables(List<TableName> tableList,
+      List<TableName> nonExistingTableList) {
+
+    for (TableName table : nonExistingTableList) {
+      tableList.remove(table);
+    }
+    return tableList;
+  }
+
+  @Override
+  public Future<String> backupTablesAsync(final BackupRequest userRequest) throws IOException {
+    // TBI
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
new file mode 100644
index 0000000..6fad17a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * After a full backup was created, the incremental backup will only store the changes made
+ * after the last full or incremental backup.
+ *
+ * Creating the backup copies the logfiles in .logs and .oldlogs since the last backup timestamp.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IncrementalBackupManager {
+  public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class);
+
+  // parent manager
+  private final BackupManager backupManager;
+  private final Configuration conf;
+
+  public IncrementalBackupManager(BackupManager bm) {
+    this.backupManager = bm;
+    this.conf = bm.getConf();
+  }
+
+  /**
+   * Obtain the list of logs that need to be copied out for this incremental backup.
+   * The list is set in BackupContext.
+   * @param conn the Connection
+   * @param backupContext backup context
+   * @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
+   * @throws IOException exception
+   */
+  public HashMap<String, Long> getIncrBackupLogFileList(Connection conn,BackupInfo backupContext)
+      throws IOException {
+    List<String> logList;
+    HashMap<String, Long> newTimestamps;
+    HashMap<String, Long> previousTimestampMins;
+
+    String savedStartCode = backupManager.readBackupStartCode();
+
+    // key: tableName
+    // value: <RegionServer,PreviousTimeStamp>
+    HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
+        backupManager.readLogTimestampMap();
+
+    previousTimestampMins = BackupServerUtil.getRSLogTimestampMins(previousTimestampMap);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("StartCode " + savedStartCode + "for backupID " + backupContext.getBackupId());
+    }
+    // get all new log files from .logs and .oldlogs after last TS and before new timestamp
+    if (savedStartCode == null ||
+        previousTimestampMins == null ||
+          previousTimestampMins.isEmpty()) {
+      throw new IOException("Cannot read any previous back up timestamps from hbase:backup. "
+          + "In order to create an incremental backup, at least one full backup is needed.");
+    }
+
+    LOG.info("Execute roll log procedure for incremental backup ...");
+    HashMap<String, String> props = new HashMap<String, String>();
+    props.put("backupRoot", backupContext.getTargetRootDir());
+
+    try(Admin admin = conn.getAdmin();) {
+
+      admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+        LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+    }
+    newTimestamps = backupManager.readRegionServerLastLogRollResult();
+
+    logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
+    List<WALItem> logFromSystemTable =
+        getLogFilesFromBackupSystem(previousTimestampMins,
+      newTimestamps, backupManager.getBackupContext().getTargetRootDir());
+    addLogsFromBackupSystemToContext(logFromSystemTable);
+
+    logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
+    backupContext.setIncrBackupFileList(logList);
+
+    return newTimestamps;
+  }
+
+
+  private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
+      List<WALItem> logFromSystemTable) {
+
+    List<String> backupedWALList = toWALList(logFromSystemTable);
+    logList.removeAll(backupedWALList);
+    return logList;
+  }
+
+  private List<String> toWALList(List<WALItem> logFromSystemTable) {
+
+    List<String> list = new ArrayList<String>(logFromSystemTable.size());
+    for(WALItem item : logFromSystemTable){
+      list.add(item.getWalFile());
+    }
+    return list;
+  }
+
+  private void addLogsFromBackupSystemToContext(List<WALItem> logFromSystemTable) {
+    List<String> walFiles = new ArrayList<String>();
+    for(WALItem item : logFromSystemTable){
+      Path p = new Path(item.getWalFile());
+      String walFileName = p.getName();
+      String backupId = item.getBackupId();
+      String relWALPath = backupId + Path.SEPARATOR+walFileName;
+      walFiles.add(relWALPath);
+    }
+  }
+
+
+  /**
+   * For each region server: get all log files newer than the last timestamps,
+   * but not newer than the newest timestamps. FROM hbase:backup table
+   * @param olderTimestamps - the timestamp for each region server of the last backup.
+   * @param newestTimestamps - the timestamp for each region server that the backup should lead to.
+   * @return list of log files which needs to be added to this backup
+   * @throws IOException
+   */
+  private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
+    List<WALItem> logFiles = new ArrayList<WALItem>();
+    Iterator<WALItem> it = backupManager.getWALFilesFromBackupSystem();
+    while (it.hasNext()) {
+      WALItem item = it.next();
+      String rootDir = item.getBackupRoot();
+      if (!rootDir.equals(backupRoot)) {
+        continue;
+      }
+      String walFileName = item.getWalFile();
+      String server = BackupServerUtil.parseHostNameFromLogFile(new Path(walFileName));
+      if (server == null) {
+        continue;
+      }
+      Long tss = getTimestamp(walFileName);
+      Long oldTss = olderTimestamps.get(server);
+      Long newTss = newestTimestamps.get(server);
+      if (oldTss == null) {
+        logFiles.add(item);
+        continue;
+      }
+      if (newTss == null) {
+        newTss = Long.MAX_VALUE;
+      }
+      if (tss > oldTss && tss < newTss) {
+        logFiles.add(item);
+      }
+    }
+    return logFiles;
+  }
+
+  private Long getTimestamp(String walFileName) {
+    int index = walFileName.lastIndexOf(BackupServerUtil.LOGNAME_SEPARATOR);
+    return Long.parseLong(walFileName.substring(index+1));
+  }
+
+  /**
+   * For each region server: get all log files newer than the last timestamps but not newer than the
+   * newest timestamps.
+   * @param olderTimestamps the timestamp for each region server of the last backup.
+   * @param newestTimestamps the timestamp for each region server that the backup should lead to.
+   * @param conf the Hadoop and Hbase configuration
+   * @param savedStartCode the startcode (timestamp) of last successful backup.
+   * @return a list of log files to be backed up
+   * @throws IOException exception
+   */
+  private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
+      HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
+      throws IOException {
+    LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+        + "\n newestTimestamps: " + newestTimestamps);
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    FileSystem fs = rootdir.getFileSystem(conf);
+    NewestLogFilter pathFilter = new NewestLogFilter();
+
+    List<String> resultLogFiles = new ArrayList<String>();
+    List<String> newestLogs = new ArrayList<String>();
+
+    /*
+     * The old region servers and timestamps info we kept in hbase:backup may be out of sync if new
+     * region server is added or existing one lost. We'll deal with it here when processing the
+     * logs. If data in hbase:backup has more hosts, just ignore it. If the .logs directory includes
+     * more hosts, the additional hosts will not have old timestamps to compare with. We'll just use
+     * all the logs in that directory. We always write up-to-date region server and timestamp info
+     * to hbase:backup at the end of successful backup.
+     */
+
+    FileStatus[] rss;
+    Path p;
+    String host;
+    Long oldTimeStamp;
+    String currentLogFile;
+    Long currentLogTS;
+
+    // Get the files in .logs.
+    rss = fs.listStatus(logDir);
+    for (FileStatus rs : rss) {
+      p = rs.getPath();
+      host = BackupServerUtil.parseHostNameFromLogFile(p);
+      if (host == null) {
+        continue;
+      }
+      FileStatus[] logs;
+      oldTimeStamp = olderTimestamps.get(host);
+      // It is possible that there is no old timestamp in hbase:backup for this host if
+      // this region server is newly added after our last backup.
+      if (oldTimeStamp == null) {
+        logs = fs.listStatus(p);
+      } else {
+        pathFilter.setLastBackupTS(oldTimeStamp);
+        logs = fs.listStatus(p, pathFilter);
+      }
+      for (FileStatus log : logs) {
+        LOG.debug("currentLogFile: " + log.getPath().toString());
+        if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
+          }
+          continue;
+        }
+        currentLogFile = log.getPath().toString();
+        resultLogFiles.add(currentLogFile);
+        currentLogTS = BackupClientUtil.getCreationTime(log.getPath());
+        // newestTimestamps is up-to-date with the current list of hosts
+        // so newestTimestamps.get(host) will not be null.
+        if (currentLogTS > newestTimestamps.get(host)) {
+          newestLogs.add(currentLogFile);
+        }
+      }
+    }
+
+    // Include the .oldlogs files too.
+    FileStatus[] oldlogs = fs.listStatus(oldLogDir);
+    for (FileStatus oldlog : oldlogs) {
+      p = oldlog.getPath();
+      currentLogFile = p.toString();
+      if (AbstractFSWALProvider.isMetaFile(p)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip .meta log file: " + currentLogFile);
+        }
+        continue;
+      }
+      host = BackupClientUtil.parseHostFromOldLog(p);
+      if (host == null) {
+        continue;
+      }
+      currentLogTS = BackupClientUtil.getCreationTime(p);
+      oldTimeStamp = olderTimestamps.get(host);
+      /*
+       * It is possible that there is no old timestamp in hbase:backup for this host. At the time of
+       * our last backup operation, this rs did not exist. The reason can be one of the two: 1. The
+       * rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after our
+       * last backup.
+       */
+      if (oldTimeStamp == null) {
+        if (currentLogTS < Long.parseLong(savedStartCode)) {
+          // This log file is really old, its region server was before our last backup.
+          continue;
+        } else {
+          resultLogFiles.add(currentLogFile);
+        }
+      } else if (currentLogTS > oldTimeStamp) {
+        resultLogFiles.add(currentLogFile);
+      }
+
+      // It is possible that a host in .oldlogs is an obsolete region server
+      // so newestTimestamps.get(host) here can be null.
+      // Even if these logs belong to a obsolete region server, we still need
+      // to include they to avoid loss of edits for backup.
+      Long newTimestamp = newestTimestamps.get(host);
+      if (newTimestamp != null && currentLogTS > newTimestamp) {
+        newestLogs.add(currentLogFile);
+      }
+    }
+    // remove newest log per host because they are still in use
+    resultLogFiles.removeAll(newestLogs);
+    return resultLogFiles;
+  }
+
+  static class NewestLogFilter implements PathFilter {
+    private Long lastBackupTS = 0L;
+
+    public NewestLogFilter() {
+    }
+
+    protected void setLastBackupTS(Long ts) {
+      this.lastBackupTS = ts;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      // skip meta table log -- ts.meta file
+      if (AbstractFSWALProvider.isMetaFile(path)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip .meta log file: " + path.getName());
+        }
+        return false;
+      }
+      Long timestamp = null;
+      try {
+        timestamp = BackupClientUtil.getCreationTime(path);
+        return timestamp > lastBackupTS;
+      } catch (Exception e) {
+        LOG.warn("Cannot read timestamp of log file " + path);
+        return false;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
new file mode 100644
index 0000000..55be02c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyTask;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+
+@InterfaceAudience.Private
+public class IncrementalTableBackupClient {
+  private static final Log LOG = LogFactory.getLog(IncrementalTableBackupClient.class);
+
+  private Configuration conf;
+  private Connection conn;
+  //private String backupId;
+  HashMap<String, Long> newTimestamps = null;
+
+  private String backupId;
+  private BackupManager backupManager;
+  private BackupInfo backupContext;
+
+  public IncrementalTableBackupClient() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public IncrementalTableBackupClient(final Connection conn, final String backupId,
+      BackupRequest request)
+      throws IOException {
+
+    this.conn = conn;
+    this.conf = conn.getConfiguration();
+    backupManager = new BackupManager(conn, conf);
+    this.backupId = backupId;
+    backupContext =
+        backupManager.createBackupContext(backupId, BackupType.INCREMENTAL, request.getTableList(),
+          request.getTargetRootDir(), request.getWorkers(), (int) request.getBandwidth());
+  }
+
+  private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    List<String> list = new ArrayList<String>();
+    for (String file : incrBackupFileList) {
+      if (fs.exists(new Path(file))) {
+        list.add(file);
+      } else {
+        LOG.warn("Can't find file: " + file);
+      }
+    }
+    return list;
+  }
+
+  private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    List<String> list = new ArrayList<String>();
+    for (String file : incrBackupFileList) {
+      if (!fs.exists(new Path(file))) {
+        list.add(file);
+      }
+    }
+    return list;
+
+  }
+
+  /**
+   * Do incremental copy.
+   * @param backupContext backup context
+   */
+  private void incrementalCopy(BackupInfo backupContext) throws Exception {
+
+    LOG.info("Incremental copy is starting.");
+    // set overall backup phase: incremental_copy
+    backupContext.setPhase(BackupPhase.INCREMENTAL_COPY);
+    // get incremental backup file list and prepare parms for DistCp
+    List<String> incrBackupFileList = backupContext.getIncrBackupFileList();
+    // filter missing files out (they have been copied by previous backups)
+    incrBackupFileList = filterMissingFiles(incrBackupFileList);
+    String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
+    strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
+
+    BackupCopyTask copyService = BackupRestoreServerFactory.getBackupCopyTask(conf);
+    int counter = 0;
+    int MAX_ITERAIONS = 2;
+    while (counter++ < MAX_ITERAIONS) {
+      // We run DistCp maximum 2 times
+      // If it fails on a second time, we throw Exception
+      int res =
+          copyService.copy(backupContext, backupManager, conf, BackupType.INCREMENTAL,
+            strArr);
+
+      if (res != 0) {
+        LOG.error("Copy incremental log files failed with return code: " + res + ".");
+        throw new IOException("Failed of Hadoop Distributed Copy from "
+            + StringUtils.join(incrBackupFileList, ",") + " to "
+            + backupContext.getHLogTargetDir());
+      }
+      List<String> missingFiles = getMissingFiles(incrBackupFileList);
+
+      if (missingFiles.isEmpty()) {
+        break;
+      } else {
+        // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run
+        // update backupContext and strAttr
+        if (counter == MAX_ITERAIONS) {
+          String msg =
+              "DistCp could not finish the following files: "
+          + StringUtils.join(missingFiles, ",");
+          LOG.error(msg);
+          throw new IOException(msg);
+        }
+        List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
+        incrBackupFileList.removeAll(missingFiles);
+        incrBackupFileList.addAll(converted);
+        backupContext.setIncrBackupFileList(incrBackupFileList);
+
+        // Run DistCp only for missing files (which have been moved from WALs to oldWALs
+        // during previous run)
+        strArr = converted.toArray(new String[converted.size() + 1]);
+        strArr[strArr.length - 1] = backupContext.getHLogTargetDir();
+      }
+    }
+
+    LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to "
+        + backupContext.getHLogTargetDir() + " finished.");
+  }
+
+  private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException {
+    List<String> list = new ArrayList<String>();
+    for (String path : missingFiles) {
+      if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
+        LOG.error("Copy incremental log files failed, file is missing : " + path);
+        throw new IOException("Failed of Hadoop Distributed Copy to "
+            + backupContext.getHLogTargetDir() + ", file is missing " + path);
+      }
+      list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR
+          + HConstants.HREGION_OLDLOGDIR_NAME));
+    }
+    return list;
+  }
+
+  public void execute() throws IOException {
+
+    // case PREPARE_INCREMENTAL:
+    FullTableBackupClient.beginBackup(backupManager, backupContext);
+    LOG.debug("For incremental backup, current table set is "
+        + backupManager.getIncrementalBackupTableSet());
+    try {
+      IncrementalBackupManager incrBackupManager = new IncrementalBackupManager(backupManager);
+
+      newTimestamps = incrBackupManager.getIncrBackupLogFileList(conn, backupContext);
+    } catch (Exception e) {
+      // fail the overall backup and return
+      FullTableBackupClient.failBackup(conn, backupContext, backupManager, e,
+        "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
+    }
+
+    // case INCREMENTAL_COPY:
+    try {
+      // copy out the table and region info files for each table
+      BackupServerUtil.copyTableRegionInfo(conn, backupContext, conf);
+      incrementalCopy(backupContext);
+      // Save list of WAL files copied
+      backupManager.recordWALFiles(backupContext.getIncrBackupFileList());
+    } catch (Exception e) {
+      String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
+      // fail the overall backup and return
+      FullTableBackupClient.failBackup(conn, backupContext, backupManager, e, msg,
+        BackupType.INCREMENTAL, conf);
+    }
+    // case INCR_BACKUP_COMPLETE:
+    // set overall backup status: complete. Here we make sure to complete the backup.
+    // After this checkpoint, even if entering cancel process, will let the backup finished
+    try {
+      backupContext.setState(BackupState.COMPLETE);
+      // Set the previousTimestampMap which is before this current log roll to the manifest.
+      HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
+          backupManager.readLogTimestampMap();
+      backupContext.setIncrTimestampMap(previousTimestampMap);
+
+      // The table list in backupContext is good for both full backup and incremental backup.
+      // For incremental backup, it contains the incremental backup table set.
+      backupManager.writeRegionServerLogTimestamp(backupContext.getTables(), newTimestamps);
+
+      HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
+          backupManager.readLogTimestampMap();
+
+      Long newStartCode =
+          BackupClientUtil.getMinValue(BackupServerUtil
+              .getRSLogTimestampMins(newTableSetTimestampMap));
+      backupManager.writeBackupStartCode(newStartCode);
+      // backup complete
+      FullTableBackupClient.completeBackup(conn, backupContext, backupManager,
+        BackupType.INCREMENTAL, conf);
+
+    } catch (IOException e) {
+      FullTableBackupClient.failBackup(conn, backupContext, backupManager, e,
+        "Unexpected Exception : ", BackupType.INCREMENTAL, conf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
new file mode 100644
index 0000000..768910f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.util.RestoreServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+
+@InterfaceAudience.Private
+public class RestoreTablesClient {
+  private static final Log LOG = LogFactory.getLog(RestoreTablesClient.class);
+
+  private Configuration conf;
+  private Connection conn;
+  private String backupId;
+  private TableName[] sTableArray;
+  private TableName[] tTableArray;
+  private String targetRootDir;
+  private boolean isOverwrite;
+
+  public RestoreTablesClient() {
+    // Required by the Procedure framework to create the procedure on replay
+  }
+
+  public RestoreTablesClient(Connection conn, RestoreRequest request)
+      throws IOException {
+    this.targetRootDir = request.getBackupRootDir();
+    this.backupId = request.getBackupId();
+    this.sTableArray = request.getFromTables();
+    this.tTableArray = request.getToTables();
+    if (tTableArray == null || tTableArray.length == 0) {
+      this.tTableArray = sTableArray;
+    }
+    this.isOverwrite = request.isOverwrite();
+    this.conn = conn;
+    this.conf = conn.getConfiguration();
+
+  }
+
+  /**
+   * Validate target Tables
+   * @param conn connection
+   * @param mgr table state manager
+   * @param tTableArray: target tables
+   * @param isOverwrite overwrite existing table
+   * @throws IOException exception
+   */
+  private void checkTargetTables(TableName[] tTableArray, boolean isOverwrite)
+      throws IOException {
+    ArrayList<TableName> existTableList = new ArrayList<>();
+    ArrayList<TableName> disabledTableList = new ArrayList<>();
+
+    // check if the tables already exist
+    try (Admin admin = conn.getAdmin();) {
+      for (TableName tableName : tTableArray) {
+        if (admin.tableExists(tableName)) {
+          existTableList.add(tableName);
+          if (admin.isTableDisabled(tableName)) {
+            disabledTableList.add(tableName);
+          }
+        } else {
+          LOG.info("HBase table " + tableName
+              + " does not exist. It will be created during restore process");
+        }
+      }
+    }
+
+    if (existTableList.size() > 0) {
+      if (!isOverwrite) {
+        LOG.error("Existing table ("
+         + existTableList
+         + ") found in the restore target, please add "
+         + "\"-overwrite\" option in the command if you mean to restore to these existing tables");
+        throw new IOException("Existing table found in target while no \"-overwrite\" "
+            + "option found");
+      } else {
+        if (disabledTableList.size() > 0) {
+          LOG.error("Found offline table in the restore target, "
+              + "please enable them before restore with \"-overwrite\" option");
+          LOG.info("Offline table list in restore target: " + disabledTableList);
+          throw new IOException(
+              "Found offline table in the target when restore with \"-overwrite\" option");
+        }
+      }
+    }
+  }
+
+  /**
+   * Restore operation handle each backupImage in array
+   * @param svc: master services
+   * @param images: array BackupImage
+   * @param sTable: table to be restored
+   * @param tTable: table to be restored to
+   * @param truncateIfExists: truncate table
+   * @throws IOException exception
+   */
+
+  private void restoreImages(BackupImage[] images, TableName sTable, TableName tTable,
+      boolean truncateIfExists) throws IOException {
+
+    // First image MUST be image of a FULL backup
+    BackupImage image = images[0];
+    String rootDir = image.getRootDir();
+    String backupId = image.getBackupId();
+    Path backupRoot = new Path(rootDir);
+    RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId);
+    Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
+    String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
+    // We need hFS only for full restore (see the code)
+    BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
+    if (manifest.getType() == BackupType.FULL) {
+      LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+          + tableBackupPath.toString());
+      restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
+        lastIncrBackupId);
+    } else { // incremental Backup
+      throw new IOException("Unexpected backup type " + image.getType());
+    }
+
+    if (images.length == 1) {
+      // full backup restore done
+      return;
+    }
+
+    List<Path> dirList = new ArrayList<Path>();
+    // add full backup path
+    // full backup path comes first
+    for (int i = 1; i < images.length; i++) {
+      BackupImage im = images[i];
+      String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId());
+      dirList.add(new Path(logBackupDir));
+    }
+
+    String dirs = StringUtils.join(dirList, ",");
+    LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs);
+    Path[] paths = new Path[dirList.size()];
+    dirList.toArray(paths);
+    restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
+      new TableName[] { tTable }, lastIncrBackupId);
+    LOG.info(sTable + " has been successfully restored to " + tTable);
+
+  }
+
+  /**
+   * Restore operation. Stage 2: resolved Backup Image dependency
+   * @param backupManifestMap : tableName, Manifest
+   * @param sTableArray The array of tables to be restored
+   * @param tTableArray The array of mapping tables to restore to
+   * @return set of BackupImages restored
+   * @throws IOException exception
+   */
+  private void restore(HashMap<TableName, BackupManifest> backupManifestMap,
+      TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
+    TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
+    boolean truncateIfExists = isOverwrite;
+    try {
+      for (int i = 0; i < sTableArray.length; i++) {
+        TableName table = sTableArray[i];
+        BackupManifest manifest = backupManifestMap.get(table);
+        // Get the image list of this backup for restore in time order from old
+        // to new.
+        List<BackupImage> list = new ArrayList<BackupImage>();
+        list.add(manifest.getBackupImage());
+        TreeSet<BackupImage> set = new TreeSet<BackupImage>(list);
+        List<BackupImage> depList = manifest.getDependentListByTable(table);
+        set.addAll(depList);
+        BackupImage[] arr = new BackupImage[set.size()];
+        set.toArray(arr);
+        restoreImages(arr, table, tTableArray[i], truncateIfExists);
+        restoreImageSet.addAll(list);
+        if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
+          LOG.info("Restore includes the following image(s):");
+          for (BackupImage image : restoreImageSet) {
+            LOG.info("Backup: "
+                + image.getBackupId()
+                + " "
+                + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
+                  table));
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed", e);
+      throw new IOException(e);
+    }
+    LOG.debug("restoreStage finished");
+  }
+
+  public void execute() throws IOException {
+
+    // case VALIDATION:
+    // check the target tables
+    checkTargetTables(tTableArray, isOverwrite);
+    // case RESTORE_IMAGES:
+    HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
+    // check and load backup image manifest for the tables
+    Path rootPath = new Path(targetRootDir);
+    HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
+      backupId);
+    restore(backupManifestMap, sTableArray, tTableArray, isOverwrite);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
new file mode 100644
index 0000000..a9a622c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileInputFormat2;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to split HFiles into new region boundaries as a M/R job.
+ * The tool generates HFiles for later bulk importing,
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HFileSplitter extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(HFileSplitter.class);
+  final static String NAME = "HFileSplitter";
+  public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output";
+  public final static String TABLES_KEY = "hfile.input.tables";
+  public final static String TABLE_MAP_KEY = "hfile.input.tablesmap";
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public HFileSplitter(){
+  }
+
+  protected HFileSplitter(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out cells.
+   * This one can be used together with {@link KeyValueSortReducer}
+   */
+  static class HFileCellMapper
+    extends Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+
+    @Override
+    public void map(NullWritable key, KeyValue value, Context context) throws IOException,
+        InterruptedException {
+      // Convert value to KeyValue if subclass
+      if (!value.getClass().equals(KeyValue.class)) {
+        value =
+            new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
+                value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
+                value.getQualifierArray(), value.getQualifierOffset(),
+                value.getQualifierLength(), value.getTimestamp(), Type.codeToType(value
+                    .getTypeByte()), value.getValueArray(), value.getValueOffset(),
+                value.getValueLength());
+      }
+      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // do nothing
+    }
+  }
+
+
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    String inputDirs = args[0];
+    String tabName = args[1];
+    conf.setStrings(TABLES_KEY, tabName);
+    Job job =
+        Job.getInstance(conf,
+          conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
+    job.setJarByClass(HFileSplitter.class);
+    FileInputFormat.addInputPaths(job, inputDirs);
+    job.setInputFormatClass(HFileInputFormat2.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+      TableName tableName = TableName.valueOf(tabName);
+      job.setMapperClass(HFileCellMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job,
+          table.getTableDescriptor(), regionLocator);
+      }
+      LOG.debug("success configuring load incremental job");
+
+      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        com.google.common.base.Preconditions.class);
+    } else {
+      throw new IOException("No bulk output directory specified");
+    }
+    return job;
+  }
+
+
+  /**
+   * Print usage
+   * @param errorMsg Error message.  Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <HFile inputdir(s)> <table>");
+    System.err.println("Read all HFile's for <table> and split them to <table> region boundaries.");
+    System.err.println("<table>  table to load.\n");
+    System.err.println("To generate HFiles for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("Other options:");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the HFile splitter");
+    System.err.println("For performance also consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n" + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    int result =job.waitForCompletion(true) ? 0 : 1;
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
new file mode 100644
index 0000000..b942446
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupCopyTask;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+/**
+ * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot,
+ * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper
+ * implementation. The other is copying for incremental log files, which bases on extending
+ * DistCp's function with copy progress reporting to ZooKeeper implementation.
+ *
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceBackupCopyTask implements BackupCopyTask {
+  private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyTask.class);
+
+  private Configuration conf;
+  // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024;
+
+  // Accumulated progress within the whole backup process for the copy operation
+  private float progressDone = 0.1f;
+  private long bytesCopied = 0;
+  private static float INIT_PROGRESS = 0.1f;
+
+  // The percentage of the current copy task within the whole task if multiple time copies are
+  // needed. The default value is 100%, which means only 1 copy task for the whole.
+  private float subTaskPercntgInWholeTask = 1f;
+
+  public MapReduceBackupCopyTask() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the current copy task percentage within the whole task if multiple copies are needed.
+   * @return the current copy task percentage
+   */
+  public float getSubTaskPercntgInWholeTask() {
+    return subTaskPercntgInWholeTask;
+  }
+
+  /**
+   * Set the current copy task percentage within the whole task if multiple copies are needed. Must
+   * be called before calling
+   * {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
+   * @param subTaskPercntgInWholeTask The percentage of the copy subtask
+   */
+  public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
+    this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
+  }
+
+  static class SnapshotCopy extends ExportSnapshot {
+    private BackupInfo backupContext;
+    private TableName table;
+
+    public SnapshotCopy(BackupInfo backupContext, TableName table) {
+      super();
+      this.backupContext = backupContext;
+      this.table = table;
+    }
+
+    public TableName getTable() {
+      return this.table;
+    }
+
+    public BackupInfo getBackupInfo() {
+      return this.backupContext;
+    }
+  }
+
+  /**
+   * Update the ongoing backup with new progress.
+   * @param backupContext backup context
+   *
+   * @param newProgress progress
+   * @param bytesCopied bytes copied
+   * @throws NoNodeException exception
+   */
+  static void updateProgress(BackupInfo backupContext, BackupManager backupManager,
+      int newProgress, long bytesCopied) throws IOException {
+    // compose the new backup progress data, using fake number for now
+    String backupProgressData = newProgress + "%";
+
+    backupContext.setProgress(newProgress);
+    backupManager.updateBackupInfo(backupContext);
+    LOG.debug("Backup progress data \"" + backupProgressData
+      + "\" has been updated to hbase:backup for " + backupContext.getBackupId());
+  }
+
+  // Extends DistCp for progress updating to hbase:backup
+  // during backup. Using DistCpV2 (MAPREDUCE-2765).
+  // Simply extend it and override execute() method to get the
+  // Job reference for progress updating.
+  // Only the argument "src1, [src2, [...]] dst" is supported,
+  // no more DistCp options.
+  class BackupDistCp extends DistCp {
+
+    private BackupInfo backupContext;
+    private BackupManager backupManager;
+
+    public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupContext,
+        BackupManager backupManager)
+        throws Exception {
+      super(conf, options);
+      this.backupContext = backupContext;
+      this.backupManager = backupManager;
+    }
+
+    @Override
+    public Job execute() throws Exception {
+
+      // reflection preparation for private methods and fields
+      Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
+      Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath");
+      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
+      Method methodCreateInputFileListing =
+          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
+      Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
+
+      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
+      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
+      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
+
+      methodCreateMetaFolderPath.setAccessible(true);
+      methodCreateJob.setAccessible(true);
+      methodCreateInputFileListing.setAccessible(true);
+      methodCleanup.setAccessible(true);
+
+      fieldInputOptions.setAccessible(true);
+      fieldMetaFolder.setAccessible(true);
+      fieldJobFS.setAccessible(true);
+      fieldSubmitted.setAccessible(true);
+
+      // execute() logic starts here
+      assert fieldInputOptions.get(this) != null;
+
+      Job job = null;
+      try {
+        synchronized (this) {
+          // Don't cleanup while we are setting up.
+          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
+          fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
+          job = (Job) methodCreateJob.invoke(this);
+        }
+        methodCreateInputFileListing.invoke(this, job);
+
+        // Get the total length of the source files
+        List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths();
+
+        long totalSrcLgth = 0;
+        for (Path aSrc : srcs) {
+          totalSrcLgth += BackupServerUtil.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
+        }
+
+        // submit the copy job
+        job.submit();
+        fieldSubmitted.set(this, true);
+
+        // after submit the MR job, set its handler in backup handler for cancel process
+        // this.backupHandler.copyJob = job;
+
+        // Update the copy progress to ZK every 0.5s if progress value changed
+        int progressReportFreq =
+            this.getConf().getInt("hbase.backup.progressreport.frequency", 500);
+        float lastProgress = progressDone;
+        while (!job.isComplete()) {
+          float newProgress =
+              progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+
+          if (newProgress > lastProgress) {
+
+            BigDecimal progressData =
+                new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+            String newProgressStr = progressData + "%";
+            LOG.info("Progress: " + newProgressStr);
+            updateProgress(backupContext, backupManager, progressData.intValue(),
+              bytesCopied);
+            LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+              + ".\"");
+            lastProgress = newProgress;
+          }
+          Thread.sleep(progressReportFreq);
+        }
+        // update the progress data after copy job complete
+        float newProgress =
+            progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
+        BigDecimal progressData =
+            new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
+
+        String newProgressStr = progressData + "%";
+        LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask +
+            " mapProgress: " + job.mapProgress());
+
+        // accumulate the overall backup progress
+        progressDone = newProgress;
+        bytesCopied += totalSrcLgth;
+
+        updateProgress(backupContext, backupManager, progressData.intValue(),
+          bytesCopied);
+        LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr
+          + " - " + bytesCopied + " bytes copied.\"");
+      } catch (Throwable t) {
+        LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t);
+        throw t;
+      } finally {
+        if (!fieldSubmitted.getBoolean(this)) {
+          methodCleanup.invoke(this);
+        }
+      }
+
+      String jobID = job.getJobID().toString();
+      job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+
+      LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " +
+          job.isSuccessful());
+      Counters ctrs = job.getCounters();
+      LOG.debug(ctrs);
+      if (job.isComplete() && !job.isSuccessful()) {
+        throw new Exception("DistCp job-id: " + jobID + " failed");
+      }
+
+      return job;
+    }
+
+  }
+
+
+  /**
+   * Do backup copy based on different types.
+   * @param context The backup context
+   * @param conf The hadoop configuration
+   * @param copyType The backup copy type
+   * @param options Options for customized ExportSnapshot or DistCp
+   * @throws Exception exception
+   */
+  @Override
+  public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
+      BackupType copyType, String[] options) throws IOException {
+    int res = 0;
+
+    try {
+      if (copyType == BackupType.FULL) {
+        SnapshotCopy snapshotCp =
+            new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
+        LOG.debug("Doing SNAPSHOT_COPY");
+        // Make a new instance of conf to be used by the snapshot copy class.
+        snapshotCp.setConf(new Configuration(conf));
+        res = snapshotCp.run(options);
+
+      } else if (copyType == BackupType.INCREMENTAL) {
+        LOG.debug("Doing COPY_TYPE_DISTCP");
+        setSubTaskPercntgInWholeTask(1f);
+
+        BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context,
+          backupManager);
+        // Handle a special case where the source file is a single file.
+        // In this case, distcp will not create the target dir. It just take the
+        // target as a file name and copy source file to the target (as a file name).
+        // We need to create the target dir before run distcp.
+        LOG.debug("DistCp options: " + Arrays.toString(options));
+        Path dest = new Path(options[options.length-1]);
+        FileSystem destfs = dest.getFileSystem(conf);
+        if (!destfs.exists(dest)) {
+          destfs.mkdirs(dest);
+        }
+        res = distcp.run(options);
+      }
+      return res;
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+   @Override
+   public void cancelCopyJob(String jobId) throws IOException {
+     JobID id = JobID.forName(jobId);
+     Cluster cluster = new Cluster(this.getConf());
+     try {
+       Job job = cluster.getJob(id);
+       if (job == null) {
+         LOG.error("No job found for " + id);
+         // should we throw exception
+         return;
+       }
+       if (job.isComplete() || job.isRetired()) {
+         return;
+       }
+
+       job.killJob();
+       LOG.debug("Killed copy job " + id);
+     } catch (InterruptedException e) {
+       throw new IOException(e);
+     }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
new file mode 100644
index 0000000..743e8e7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.RestoreTask;
+import org.apache.hadoop.hbase.backup.util.BackupServerUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MapReduceRestoreTask implements RestoreTask {
+  public static final Log LOG = LogFactory.getLog(MapReduceRestoreTask.class);
+
+  private Tool player;
+  private Configuration conf;
+
+  public MapReduceRestoreTask() {
+  }
+
+  @Override
+  public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
+      boolean fullBackupRestore) throws IOException {
+
+    String bulkOutputConfKey;
+
+    if (fullBackupRestore) {
+      player = new HFileSplitter();
+      bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY;
+    } else {
+      player = new WALPlayer();
+      bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
+    }
+    // Player reads all files in arbitrary directory structure and creates
+    // a Map task for each file
+    String dirs = StringUtils.join(dirPaths, ",");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
+          + " backup from directory " + dirs + " from hbase tables "
+          + BackupServerUtil.join(tableNames) + " to tables "
+          + BackupServerUtil.join(newTableNames));
+    }
+
+    for (int i = 0; i < tableNames.length; i++) {
+
+      LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);
+
+      Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
+      Configuration conf = getConf();
+      conf.set(bulkOutputConfKey, bulkOutputPath.toString());
+      String[] playerArgs = { dirs, tableNames[i].getNameAsString() };
+
+      int result = 0;
+      int loaderResult = 0;
+      try {
+
+        player.setConf(getConf());
+        result = player.run(playerArgs);
+        if (succeeded(result)) {
+          // do bulk load
+          LoadIncrementalHFiles loader = createLoader();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
+          }
+          String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
+          loaderResult = loader.run(args);
+
+          if (failed(loaderResult)) {
+            throw new IOException("Can not restore from backup directory " + dirs
+                + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+          }
+        } else {
+          throw new IOException("Can not restore from backup directory " + dirs
+              + " (check Hadoop/MR and HBase logs). Player return code =" + result);
+        }
+        LOG.debug("Restore Job finished:" + result);
+      } catch (Exception e) {
+        throw new IOException("Can not restore from backup directory " + dirs
+            + " (check Hadoop and HBase logs) ", e);
+      }
+
+    }
+  }
+
+  private String getFileNameCompatibleString(TableName table) {
+    return table.getNamespaceAsString() + "-" + table.getQualifierAsString();
+  }
+
+  private boolean failed(int result) {
+    return result != 0;
+  }
+
+  private boolean succeeded(int result) {
+    return result == 0;
+  }
+
+  private LoadIncrementalHFiles createLoader() throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Integer milliSecInHour = 3600000;
+    Configuration conf = new Configuration(getConf());
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
+
+    // By default, it is 32 and loader will fail if # of files in any region exceed this
+    // limit. Bad for snapshot restore.
+    conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+    conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(conf);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return loader;
+  }
+
+  private Path getBulkOutputDir(String tableName) throws IOException {
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+    String tmp =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path path =
+        new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+            + EnvironmentEdgeManager.currentTime());
+    fs.deleteOnExit(path);
+    return path;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupController.java
new file mode 100644
index 0000000..2b23e8e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupController.java
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.master;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+/**
+ * The current implementation checks if the backup system table
+ * (hbase:backup) exists on HBasae Master startup and if it does not -
+ * it creates it. We need to make sure that backup system table is
+ * created under HBase user with ADMIN privileges
+ */
+public class BackupController extends BaseMasterAndRegionObserver {
+  private static final Log LOG = LogFactory.getLog(BackupController.class.getName());
+
+  @Override
+  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
+      throws IOException {
+    // Need to create the new system table for backups (if does not exist)
+    MasterServices master = ctx.getEnvironment().getMasterServices();
+    if (!BackupManager.isBackupEnabled(master.getConfiguration())) {
+      LOG.warn("Backup is not enabled. Check your "+ BackupRestoreConstants.BACKUP_ENABLE_KEY +
+          " setting");
+      return;
+    }
+    HTableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor();
+    try{
+      master.createTable(backupHTD, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+      LOG.info("Created "+ BackupSystemTable.getTableNameAsString()+" table");
+    } catch(TableExistsException e) {
+      LOG.info("Table "+ BackupSystemTable.getTableNameAsString() +" already exists");
+    }
+  }
+}


Mime
View raw message