Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 83F76200D18 for ; Wed, 27 Sep 2017 05:58:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 827031609EA; Wed, 27 Sep 2017 03:58:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2AC311609D7 for ; Wed, 27 Sep 2017 05:58:24 +0200 (CEST) Received: (qmail 1476 invoked by uid 500); 27 Sep 2017 03:58:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 1467 invoked by uid 99); 27 Sep 2017 03:58:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 03:58:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2729AF5AEE; Wed, 27 Sep 2017 03:58:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-18843 Add DistCp support to incremental backup with bulk loading (Vladimir Rodionov) Date: Wed, 27 Sep 2017 03:58:23 +0000 (UTC) archived-at: Wed, 27 Sep 2017 03:58:25 -0000 Repository: hbase Updated Branches: refs/heads/master 976ef508f -> 417dad646 HBASE-18843 Add DistCp support to incremental backup with bulk loading (Vladimir Rodionov) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/417dad64 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/417dad64 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/417dad64 Branch: refs/heads/master Commit: 417dad646bf91922fe0117c83ea69aa501f3c2fc Parents: 976ef50 Author: tedyu Authored: Tue Sep 26 20:58:16 2017 -0700 Committer: tedyu Committed: Tue Sep 26 20:58:16 2017 -0700 ---------------------------------------------------------------------- .../impl/IncrementalTableBackupClient.java | 87 +++--- .../mapreduce/MapReduceBackupCopyJob.java | 4 +- .../util/FixedRelativePathCopyListing.java | 288 +++++++++++++++++++ .../hadoop/hbase/backup/TestBackupBase.java | 5 +- 4 files changed, 346 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/417dad64/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 4efe04b..04700c6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.backup.impl; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -33,17 +32,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.backup.util.FixedRelativePathCopyListing; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -52,7 +49,9 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.util.Tool; +import org.apache.yetus.audience.InterfaceAudience; /** * Incremental backup implementation. @@ -112,6 +111,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { */ protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; + List activeFiles = new ArrayList(); + List archiveFiles = new ArrayList(); Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); Map>>>> map = pair.getFirst(); @@ -127,6 +128,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { for (Map.Entry>>>> tblEntry : map.entrySet()) { TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); if (srcIdx < 0) { LOG.warn("Couldn't find " + srcTable + " in source table List"); @@ -162,7 +164,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { } for (Pair fileWithState : famEntry.getValue()) { String file = fileWithState.getFirst(); - boolean raw = fileWithState.getSecond(); int idx = file.lastIndexOf("/"); String filename = file; if (idx > 0) { @@ -175,37 +176,55 @@ public class IncrementalTableBackupClient extends TableBackupClient { if (LOG.isTraceEnabled()) { LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); } - try { if (LOG.isTraceEnabled()) { LOG.trace("copying " + p + " to " + tgt); } - FileUtil.copy(fs, p, tgtFs, tgt, false,conf); - } catch (FileNotFoundException e) { - LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } - } - } else { + activeFiles.add(p.toString()); + } else if (fs.exists(archive)){ LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } + archiveFiles.add(archive.toString()); } files.add(tgt); } } } } + + copyBulkLoadedFiles(activeFiles, archiveFiles); + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); return mapForSrc; } + private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) + throws IOException + { + + try { + conf.set(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, + FixedRelativePathCopyListing.class.getName()); + conf.setInt(FixedRelativePathCopyListing.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); + + // Copy active files + String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); + if (activeFiles.size() > 0) { + String[] toCopy = new String[activeFiles.size()]; + activeFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + if (archiveFiles.size() > 0) { + String[] toCopy = new String[archiveFiles.size()]; + archiveFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + } finally { + conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS); + conf.unset(FixedRelativePathCopyListing.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); + } + + } + @Override public void execute() throws IOException { @@ -229,8 +248,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, backupInfo.getBackupRootDir()); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { @@ -269,27 +288,25 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { try { - LOG.debug("Incremental copy HFiles is starting."); + LOG.debug("Incremental copy HFiles is starting. dest="+backupDest); // set overall backup phase: incremental_copy backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = new ArrayList(); - // Add Bulk output - incrBackupFileList.add(getBulkOutputDir().toString()); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + String[] strArr = new String[files.length + 1]; + System.arraycopy(files, 0, strArr, 0, files.length); + strArr[strArr.length - 1] = backupDest; BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); if (res != 0) { LOG.error("Copy incremental HFile files failed with return code: " + res + "."); - throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getHLogTargetDir()); + throw new IOException("Failed copy from " + StringUtils.join(files, ',') + + " to " + backupDest); } - LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getBackupRootDir() + " finished."); + LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + + " to " + backupDest + " finished."); } finally { deleteBulkLoadDirectory(); } @@ -306,7 +323,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } - protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + protected void convertWALsToHFiles() throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set http://git-wip-us.apache.org/repos/asf/hbase/blob/417dad64/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java index 29e71e7..2efb717 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java @@ -142,6 +142,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { * Only the argument "src1, [src2, [...]] dst" is supported, * no more DistCp options. */ + class BackupDistCp extends DistCp { private BackupInfo backupInfo; @@ -154,6 +155,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { this.backupManager = backupManager; } + @Override public Job execute() throws Exception { @@ -249,7 +251,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { LOG.debug("Backup progress data updated to backup system table: \"Progress: " + newProgressStr + " - " + bytesCopied + " bytes copied.\""); } catch (Throwable t) { - LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); + LOG.error(t); throw t; } finally { if (!fieldSubmitted.getBoolean(this)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/417dad64/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/FixedRelativePathCopyListing.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/FixedRelativePathCopyListing.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/FixedRelativePathCopyListing.java new file mode 100644 index 0000000..1b73611 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/FixedRelativePathCopyListing.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.IOException; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.SimpleCopyListing; +import org.apache.hadoop.tools.util.DistCpUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + + + +/** + * The FixedRelativePathCopyListing is responsible for making the exhaustive list of + * all files/directories under its specified list of input-paths. + * These are written into the specified copy-listing file. + * This CopyListing implementation allows to preserve fixed -level structures + * between source and destination paths for every file being copied. + * Note: The FixedRelativePathCopyListing doesn't handle wild-cards in the input-paths. + */ +@InterfaceAudience.Private +public class FixedRelativePathCopyListing extends SimpleCopyListing { + public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve"; + private static final Log LOG = LogFactory.getLog(FixedRelativePathCopyListing.class); + + private long totalPaths = 0; + private long totalBytesToCopy = 0; + private int numLevelsToPreserve = 1; + + /** + * Public constructor, to initialize configuration. + * + * @param configuration The input configuration, with which the source/target FileSystems may be accessed. + * @param credentials - Credentials object on which the FS delegation tokens are cached. If null + * delegation token caching is skipped + */ + public FixedRelativePathCopyListing(Configuration configuration, Credentials credentials) { + super(configuration, credentials); + this.numLevelsToPreserve = configuration.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1); + } + + + /** {@inheritDoc} */ + @Override + public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { + doBuildListing(getWriter(pathToListingFile), options); + } + /** + * Collect the list of + * {@literal } + * to be copied and write to the sequence file. In essence, any file or + * directory that need to be copied or sync-ed is written as an entry to the + * sequence file, with the possible exception of the source root: + * when either -update (sync) or -overwrite switch is specified, and if + * the the source root is a directory, then the source root entry is not + * written to the sequence file, because only the contents of the source + * directory need to be copied in this case. + * See {@link org.apache.hadoop.tools.util.DistCpUtils#getRelativePath} for + * how relative path is computed. + * See computeSourceRootPath method for how the root path of the source is + * computed. + * @param fileListWriter + * @param options + * @throws IOException + */ + @Override + @VisibleForTesting + public void doBuildListing(SequenceFile.Writer fileListWriter, + DistCpOptions options) throws IOException { + try { + for (Path path: options.getSourcePaths()) { + FileSystem sourceFS = path.getFileSystem(getConf()); + final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); + final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); + final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs(); + path = makeQualified(path); + + FileStatus rootStatus = sourceFS.getFileStatus(path); + Path sourcePathRoot = computeSourceRootPath(rootStatus, options); + + FileStatus[] sourceFiles = sourceFS.listStatus(path); + boolean explore = (sourceFiles != null && sourceFiles.length > 0); + if (!explore || rootStatus.isDirectory()) { + CopyListingFileStatus rootCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, + preserveAcls, preserveXAttrs, preserveRawXAttrs); + writeToFileListingRoot(fileListWriter, rootCopyListingStatus, + sourcePathRoot, options); + } + if (explore) { + for (FileStatus sourceStatus: sourceFiles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); + } + CopyListingFileStatus sourceCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, + preserveAcls && sourceStatus.isDirectory(), + preserveXAttrs && sourceStatus.isDirectory(), + preserveRawXAttrs && sourceStatus.isDirectory()); + writeToFileListing(fileListWriter, sourceCopyListingStatus, + sourcePathRoot, options); + + if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath()); + } + traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, + options); + } + } + } + } + fileListWriter.close(); + fileListWriter = null; + } finally { + IOUtils.cleanup(LOG, fileListWriter); + } + } + + private Path computeSourceRootPath(FileStatus sourceStatus, + DistCpOptions options) throws IOException { + + Path path = sourceStatus.getPath(); + int level = 0; + while (level++ < numLevelsToPreserve) { + path = path.getParent(); + } + return path; + } + + /** + * Provide an option to skip copy of a path, Allows for exclusion + * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME} + * @param path - Path being considered for copy while building the file listing + * @param options - Input options passed during DistCp invocation + * @return - True if the path should be considered for copy, false otherwise + */ + @Override + protected boolean shouldCopy(Path path, DistCpOptions options) { + return true; + } + + /** {@inheritDoc} */ + @Override + protected long getBytesToCopy() { + return totalBytesToCopy; + } + + /** {@inheritDoc} */ + @Override + protected long getNumberOfPaths() { + return totalPaths; + } + + private Path makeQualified(Path path) throws IOException { + final FileSystem fs = path.getFileSystem(getConf()); + return path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { + FileSystem fs = pathToListFile.getFileSystem(getConf()); + if (fs.exists(pathToListFile)) { + fs.delete(pathToListFile, false); + } + return SequenceFile.createWriter(getConf(), + SequenceFile.Writer.file(pathToListFile), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(CopyListingFileStatus.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); + } + + private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem, + FileStatus fileStatus) throws IOException { + return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0; + } + + private static FileStatus[] getChildren(FileSystem fileSystem, + FileStatus parent) throws IOException { + return fileSystem.listStatus(parent.getPath()); + } + + private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, + FileStatus sourceStatus, + Path sourcePathRoot, + DistCpOptions options) + throws IOException { + FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf()); + final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); + final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); + final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); + Stack pathStack = new Stack(); + pathStack.push(sourceStatus); + + while (!pathStack.isEmpty()) { + for (FileStatus child: getChildren(sourceFS, pathStack.pop())) { + if (LOG.isDebugEnabled()) + LOG.debug("Recording source-path: " + + sourceStatus.getPath() + " for copy."); + CopyListingFileStatus childCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, child, + preserveAcls && child.isDirectory(), + preserveXAttrs && child.isDirectory(), + preserveRawXattrs && child.isDirectory()); + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot, options); + if (isDirectoryAndNotEmpty(sourceFS, child)) { + if (LOG.isDebugEnabled()) + LOG.debug("Traversing non-empty source dir: " + + sourceStatus.getPath()); + pathStack.push(child); + } + } + } + } + + private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, + CopyListingFileStatus fileStatus, Path sourcePathRoot, + DistCpOptions options) throws IOException { + boolean syncOrOverwrite = options.shouldSyncFolder() || + options.shouldOverwrite(); + if (fileStatus.getPath().equals(sourcePathRoot) && + fileStatus.isDirectory() && syncOrOverwrite) { + // Skip the root-paths when syncOrOverwrite + if (LOG.isDebugEnabled()) { + LOG.debug("Skip " + fileStatus.getPath()); + } + return; + } + writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options); + } + + private void writeToFileListing(SequenceFile.Writer fileListWriter, + CopyListingFileStatus fileStatus, + Path sourcePathRoot, + DistCpOptions options) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot, + fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath()); + } + + FileStatus status = fileStatus; + + if (!shouldCopy(fileStatus.getPath(), options)) { + return; + } + + fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot, + fileStatus.getPath())), status); + fileListWriter.sync(); + + if (!fileStatus.isDirectory()) { + totalBytesToCopy += fileStatus.getLen(); + } + totalPaths++; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/417dad64/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 8752ca2..8be6ed0 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -136,8 +136,9 @@ public class TestBackupBase { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, + backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());