Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E282200D2A for ; Sat, 14 Oct 2017 00:05:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C62A160BE5; Fri, 13 Oct 2017 22:05:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32C3E1609E9 for ; Sat, 14 Oct 2017 00:05:00 +0200 (CEST) Received: (qmail 86896 invoked by uid 500); 13 Oct 2017 22:04:59 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 86887 invoked by uid 99); 13 Oct 2017 22:04:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Oct 2017 22:04:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 360B2DFA3D; Fri, 13 Oct 2017 22:04:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <1beb9f21f12b473fa87bdf04a6049c13@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18843 Add DistCp support to incremental backup with bulk loading (Vladimir Rodionov ) Date: Fri, 13 Oct 2017 22:04:59 +0000 (UTC) archived-at: Fri, 13 Oct 2017 22:05:01 -0000 Repository: hbase Updated Branches: refs/heads/master 9e7156ae6 -> a6c9d371d HBASE-18843 Add DistCp support to incremental backup with bulk loading (Vladimir Rodionov ) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a6c9d371 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a6c9d371 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a6c9d371 Branch: refs/heads/master Commit: a6c9d371df6243458ef2db17bcfea9af890076b8 Parents: 9e7156a Author: tedyu Authored: Fri Oct 13 15:04:53 2017 -0700 Committer: tedyu Committed: Fri Oct 13 15:04:53 2017 -0700 ---------------------------------------------------------------------- .../impl/IncrementalTableBackupClient.java | 84 ++++++++++++-------- .../mapreduce/MapReduceBackupCopyJob.java | 83 ++++++++++++++++++- .../hadoop/hbase/backup/TestBackupBase.java | 5 +- 3 files changed, 134 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a6c9d371/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 52f6b5c..566b77d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.backup.impl; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -33,17 +32,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -53,6 +50,7 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.util.Tool; +import org.apache.yetus.audience.InterfaceAudience; /** * Incremental backup implementation. @@ -112,6 +110,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { */ protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; + List activeFiles = new ArrayList(); + List archiveFiles = new ArrayList(); Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); Map>>>> map = pair.getFirst(); @@ -127,6 +127,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { for (Map.Entry>>>> tblEntry : map.entrySet()) { TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); if (srcIdx < 0) { LOG.warn("Couldn't find " + srcTable + " in source table List"); @@ -162,7 +163,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { } for (Pair fileWithState : famEntry.getValue()) { String file = fileWithState.getFirst(); - boolean raw = fileWithState.getSecond(); int idx = file.lastIndexOf("/"); String filename = file; if (idx > 0) { @@ -175,37 +175,53 @@ public class IncrementalTableBackupClient extends TableBackupClient { if (LOG.isTraceEnabled()) { LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); } - try { if (LOG.isTraceEnabled()) { LOG.trace("copying " + p + " to " + tgt); } - FileUtil.copy(fs, p, tgtFs, tgt, false,conf); - } catch (FileNotFoundException e) { - LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } - } - } else { + activeFiles.add(p.toString()); + } else if (fs.exists(archive)){ LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } + archiveFiles.add(archive.toString()); } files.add(tgt); } } } } + + copyBulkLoadedFiles(activeFiles, archiveFiles); + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); return mapForSrc; } + private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) + throws IOException + { + + try { + // Enable special mode of BackupDistCp + conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); + // Copy active files + String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); + if (activeFiles.size() > 0) { + String[] toCopy = new String[activeFiles.size()]; + activeFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + if (archiveFiles.size() > 0) { + String[] toCopy = new String[archiveFiles.size()]; + archiveFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + } finally { + // Disable special mode of BackupDistCp + conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); + } + + } + @Override public void execute() throws IOException { @@ -229,8 +245,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, backupInfo.getBackupRootDir()); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { @@ -270,27 +286,25 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { try { - LOG.debug("Incremental copy HFiles is starting."); + LOG.debug("Incremental copy HFiles is starting. dest="+backupDest); // set overall backup phase: incremental_copy backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = new ArrayList(); - // Add Bulk output - incrBackupFileList.add(getBulkOutputDir().toString()); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + String[] strArr = new String[files.length + 1]; + System.arraycopy(files, 0, strArr, 0, files.length); + strArr[strArr.length - 1] = backupDest; BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); if (res != 0) { LOG.error("Copy incremental HFile files failed with return code: " + res + "."); - throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getHLogTargetDir()); + throw new IOException("Failed copy from " + StringUtils.join(files, ',') + + " to " + backupDest); } - LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getBackupRootDir() + " finished."); + LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + + " to " + backupDest + " finished."); } finally { deleteBulkLoadDirectory(); } @@ -307,7 +321,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } - protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + protected void convertWALsToHFiles() throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set http://git-wip-us.apache.org/repos/asf/hbase/blob/a6c9d371/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java index 07e9fcc..90e8442 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; @@ -37,10 +38,13 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; @@ -54,6 +58,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException; */ @InterfaceAudience.Private public class MapReduceBackupCopyJob implements BackupCopyJob { + public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve"; private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class); private Configuration conf; @@ -143,6 +148,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { * Only the argument "src1, [src2, [...]] dst" is supported, * no more DistCp options. */ + class BackupDistCp extends DistCp { private BackupInfo backupInfo; @@ -156,6 +162,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { } + @Override public Job execute() throws Exception { @@ -226,7 +233,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { LOG.debug("Backup progress data updated to backup system table: \"Progress: " + newProgressStr + " - " + bytesCopied + " bytes copied.\""); } catch (Throwable t) { - LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); + LOG.error(t); throw t; } @@ -281,6 +288,80 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { } } + + @Override + protected Path createInputFileListing(Job job) throws IOException { + + if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) { + return super.createInputFileListing(job); + } + long totalBytesExpected = 0; + int totalRecords = 0; + Path fileListingPath = getFileListingPath(); + try (SequenceFile.Writer writer = getWriter(fileListingPath);) { + List srcFiles = getSourceFiles(); + if (srcFiles.size() == 0) { + return fileListingPath; + } + totalRecords = srcFiles.size(); + FileSystem fs = srcFiles.get(0).getFileSystem(conf); + for (Path path : srcFiles) { + FileStatus fst = fs.getFileStatus(path); + totalBytesExpected += fst.getLen(); + Text key = getKey(path); + writer.append(key, new CopyListingFileStatus(fst)); + } + writer.close(); + + // update jobs configuration + + Configuration cfg = job.getConfiguration(); + cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected); + cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString()); + cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords); + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException + | IllegalAccessException | NoSuchMethodException | ClassNotFoundException + | InvocationTargetException e) { + throw new IOException(e); + } + return fileListingPath; + } + + private Text getKey(Path path) { + int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1); + int count = 0; + String relPath = ""; + while (count++ < level) { + relPath = Path.SEPARATOR + path.getName() + relPath; + path = path.getParent(); + } + return new Text(relPath); + } + + private List getSourceFiles() throws NoSuchFieldException, SecurityException, + IllegalArgumentException, IllegalAccessException, NoSuchMethodException, + ClassNotFoundException, InvocationTargetException, IOException { + Field options = null; + try { + options = DistCp.class.getDeclaredField("inputOptions"); + } catch (NoSuchFieldException | SecurityException e) { + options = DistCp.class.getDeclaredField("context"); + } + options.setAccessible(true); + return getSourcePaths(options); + } + + + + private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { + FileSystem fs = pathToListFile.getFileSystem(conf); + fs.delete(pathToListFile, false); + return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(CopyListingFileStatus.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); + } + } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a6c9d371/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 69db342..c44efbd 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -136,8 +136,9 @@ public class TestBackupBase { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, + backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());