Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 38657200C4C for ; Tue, 4 Apr 2017 21:56:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 33C65160BA1; Tue, 4 Apr 2017 19:56:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DE62F160BA5 for ; Tue, 4 Apr 2017 21:56:35 +0200 (CEST) Received: (qmail 153 invoked by uid 500); 4 Apr 2017 19:56:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 97935 invoked by uid 99); 4 Apr 2017 19:56:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Apr 2017 19:56:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1207EE965C; Tue, 4 Apr 2017 19:56:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Tue, 04 Apr 2017 19:56:55 -0000 Message-Id: In-Reply-To: <83db38e6376c4b2a8806af407cf6bf57@git.apache.org> References: <83db38e6376c4b2a8806af407cf6bf57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] hadoop git commit: Revert "HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen." archived-at: Tue, 04 Apr 2017 19:56:38 -0000 Revert "HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen." This reverts commit 064c8b25eca9bc825dc07a54d9147d65c9290a03. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/144f1cf7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/144f1cf7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/144f1cf7 Branch: refs/heads/HDFS-7240 Commit: 144f1cf76527e6c75aec77ef683a898580f3cc8d Parents: 064c8b2 Author: Yongjun Zhang Authored: Thu Mar 30 17:38:18 2017 -0700 Committer: Yongjun Zhang Committed: Thu Mar 30 17:38:18 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 22 +- .../org/apache/hadoop/tools/CopyListing.java | 37 +- .../hadoop/tools/CopyListingFileStatus.java | 87 +---- .../java/org/apache/hadoop/tools/DistCp.java | 52 --- .../apache/hadoop/tools/DistCpOptionSwitch.java | 10 - .../org/apache/hadoop/tools/DistCpOptions.java | 22 +- .../org/apache/hadoop/tools/OptionsParser.java | 36 +- .../apache/hadoop/tools/SimpleCopyListing.java | 83 ++--- .../hadoop/tools/mapred/CopyCommitter.java | 174 +-------- .../apache/hadoop/tools/mapred/CopyMapper.java | 40 +- .../tools/mapred/RetriableFileCopyCommand.java | 26 +- .../tools/mapred/UniformSizeInputFormat.java | 5 +- .../apache/hadoop/tools/util/DistCpUtils.java | 111 +----- .../src/site/markdown/DistCp.md.vm | 1 - .../apache/hadoop/tools/TestDistCpSystem.java | 368 ++----------------- .../apache/hadoop/tools/TestOptionsParser.java | 2 +- .../hadoop/tools/mapred/TestCopyCommitter.java | 5 +- 17 files changed, 110 insertions(+), 971 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 9b782f3..1329195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -862,27 +862,7 @@ public class DFSTestUtil { out.write(toAppend); } } - - /** - * Append specified length of bytes to a given file, starting with new block. - * @param fs The file system - * @param p Path of the file to append - * @param length Length of bytes to append to the file - * @throws IOException - */ - public static void appendFileNewBlock(DistributedFileSystem fs, - Path p, int length) throws IOException { - assert fs.exists(p); - assert length >= 0; - byte[] toAppend = new byte[length]; - Random random = new Random(); - random.nextBytes(toAppend); - try (FSDataOutputStream out = fs.append(p, - EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) { - out.write(toAppend); - } - } - + /** * @return url content as string (UTF-8 encoding assumed) */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 9ebf9d2..481aa61 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -145,22 +145,12 @@ public abstract class CopyListing extends Configured { Configuration config = getConf(); FileSystem fs = pathToListFile.getFileSystem(config); - final boolean splitLargeFile = options.splitLargeFile(); - - // When splitLargeFile is enabled, we don't randomize the copylist - // earlier, so we don't do the sorting here. For a file that has - // multiple entries due to split, we check here that their - // is continuous. - // - Path checkPath = splitLargeFile? - pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); + Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile); SequenceFile.Reader reader = new SequenceFile.Reader( - config, SequenceFile.Reader.file(checkPath)); + config, SequenceFile.Reader.file(sortedList)); try { Text lastKey = new Text("*"); //source relative path can never hold * - long lastChunkOffset = -1; - long lastChunkLength = -1; CopyListingFileStatus lastFileStatus = new CopyListingFileStatus(); Text currentKey = new Text(); @@ -171,21 +161,8 @@ public abstract class CopyListing extends Configured { if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); reader.getCurrentValue(currentFileStatus); - if (!splitLargeFile) { - throw new DuplicateFileException("File " + lastFileStatus.getPath() - + " and " + currentFileStatus.getPath() - + " would cause duplicates. Aborting"); - } else { - if (lastChunkOffset + lastChunkLength != - currentFileStatus.getChunkOffset()) { - throw new InvalidInputException("File " + lastFileStatus.getPath() - + " " + lastChunkOffset + "," + lastChunkLength - + " and " + currentFileStatus.getPath() - + " " + currentFileStatus.getChunkOffset() + "," - + currentFileStatus.getChunkLength() - + " are not continuous. Aborting"); - } - } + throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " + + currentFileStatus.getPath() + " would cause duplicates. Aborting"); } reader.getCurrentValue(lastFileStatus); if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { @@ -204,12 +181,8 @@ public abstract class CopyListing extends Configured { xAttrSupportCheckFsSet.add(lastFsUri); } } - lastKey.set(currentKey); - if (splitLargeFile) { - lastChunkOffset = lastFileStatus.getChunkOffset(); - lastChunkLength = lastFileStatus.getChunkLength(); - } + if (options.shouldUseDiff() && LOG.isDebugEnabled()) { LOG.debug("Copy list entry " + idx + ": " + lastFileStatus.getPath().toUri().getPath()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 29c59ac..00d4b32 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -74,14 +74,6 @@ public final class CopyListingFileStatus implements Writable { private List aclEntries; private Map xAttrs; - // represents the offset and length of a file - // chunk in number of bytes. - // used when splitting a large file to chunks to copy in parallel. - // If a file is not large enough to split, chunkOffset would be 0 and - // chunkLength would be the length of the file. - private long chunkOffset = 0; - private long chunkLength = Long.MAX_VALUE; - /** * Default constructor. */ @@ -104,32 +96,11 @@ public final class CopyListingFileStatus implements Writable { fileStatus.getPath()); } - public CopyListingFileStatus(FileStatus fileStatus, - long chunkOffset, long chunkLength) { - this(fileStatus.getLen(), fileStatus.isDirectory(), - fileStatus.getReplication(), fileStatus.getBlockSize(), - fileStatus.getModificationTime(), fileStatus.getAccessTime(), - fileStatus.getPermission(), fileStatus.getOwner(), - fileStatus.getGroup(), - fileStatus.getPath()); - this.chunkOffset = chunkOffset; - this.chunkLength = chunkLength; - } - @SuppressWarnings("checkstyle:parameternumber") public CopyListingFileStatus(long length, boolean isdir, int blockReplication, long blocksize, long modificationTime, long accessTime, FsPermission permission, String owner, String group, Path path) { - this(length, isdir, blockReplication, blocksize, modificationTime, - accessTime, permission, owner, group, path, 0, Long.MAX_VALUE); - } - - @SuppressWarnings("checkstyle:parameternumber") - public CopyListingFileStatus(long length, boolean isdir, - int blockReplication, long blocksize, long modificationTime, - long accessTime, FsPermission permission, String owner, String group, - Path path, long chunkOffset, long chunkLength) { this.length = length; this.isdir = isdir; this.blockReplication = (short)blockReplication; @@ -146,23 +117,6 @@ public final class CopyListingFileStatus implements Writable { this.owner = (owner == null) ? "" : owner; this.group = (group == null) ? "" : group; this.path = path; - this.chunkOffset = chunkOffset; - this.chunkLength = chunkLength; - } - - public CopyListingFileStatus(CopyListingFileStatus other) { - this.length = other.length; - this.isdir = other.isdir; - this.blockReplication = other.blockReplication; - this.blocksize = other.blocksize; - this.modificationTime = other.modificationTime; - this.accessTime = other.accessTime; - this.permission = other.permission; - this.owner = other.owner; - this.group = other.group; - this.path = new Path(other.path.toUri()); - this.chunkOffset = other.chunkOffset; - this.chunkLength = other.chunkLength; } public Path getPath() { @@ -246,31 +200,6 @@ public final class CopyListingFileStatus implements Writable { this.xAttrs = xAttrs; } - public long getChunkOffset() { - return chunkOffset; - } - - public void setChunkOffset(long offset) { - this.chunkOffset = offset; - } - - public long getChunkLength() { - return chunkLength; - } - - public void setChunkLength(long chunkLength) { - this.chunkLength = chunkLength; - } - - public boolean isSplit() { - return getChunkLength() != Long.MAX_VALUE && - getChunkLength() != getLen(); - } - - public long getSizeToCopy() { - return isSplit()? getChunkLength() : getLen(); - } - @Override public void write(DataOutput out) throws IOException { Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN); @@ -315,9 +244,6 @@ public final class CopyListingFileStatus implements Writable { } else { out.writeInt(NO_XATTRS); } - - out.writeLong(chunkOffset); - out.writeLong(chunkLength); } @Override @@ -366,9 +292,6 @@ public final class CopyListingFileStatus implements Writable { } else { xAttrs = null; } - - chunkOffset = in.readLong(); - chunkLength = in.readLong(); } @Override @@ -394,14 +317,8 @@ public final class CopyListingFileStatus implements Writable { public String toString() { StringBuilder sb = new StringBuilder(super.toString()); sb.append('{'); - sb.append(this.getPath().toString()); - sb.append(" length = ").append(this.getLen()); - sb.append(" aclEntries = ").append(aclEntries); - sb.append(", xAttrs = ").append(xAttrs); - if (isSplit()) { - sb.append(", chunkOffset = ").append(this.getChunkOffset()); - sb.append(", chunkLength = ").append(this.getChunkLength()); - } + sb.append("aclEntries = " + aclEntries); + sb.append(", xAttrs = " + xAttrs); sb.append('}'); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 8c2fa24..ab58e9c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyOutputFormat; @@ -135,7 +134,6 @@ public class DistCp extends Configured implements Tool { try { inputOptions = (OptionsParser.parse(argv)); - setOptionsForSplitLargeFile(); setTargetPathExists(); LOG.info("Input Options: " + inputOptions); } catch (Throwable e) { @@ -237,56 +235,6 @@ public class DistCp extends Configured implements Tool { getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); } - - /** - * Check if concat is supported by fs. - * Throws UnsupportedOperationException if not. - */ - private void checkConcatSupport(FileSystem fs) { - try { - Path[] src = null; - Path tgt = null; - fs.concat(tgt, src); - } catch (UnsupportedOperationException use) { - throw new UnsupportedOperationException( - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + - " is not supported since the target file system doesn't" + - " support concat.", use); - } catch (Exception e) { - // Ignore other exception - } - } - - /** - * Set up needed options for splitting large files. - */ - private void setOptionsForSplitLargeFile() throws IOException { - if (!inputOptions.splitLargeFile()) { - return; - } - Path target = inputOptions.getTargetPath(); - FileSystem targetFS = target.getFileSystem(getConf()); - checkConcatSupport(targetFS); - - LOG.info("Enabling preserving blocksize since " - + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed."); - inputOptions.preserve(FileAttribute.BLOCKSIZE); - - LOG.info("Set " + - DistCpOptionSwitch.APPEND.getSwitch() - + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() - + " is passed."); - inputOptions.setAppend(false); - - LOG.info("Set " + - DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES - + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() - + " is passed."); - getConf().setBoolean( - DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); - } - - /** * Create Job object for submitting it, with all the configuration * http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ced9b54..fb47d76 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -169,16 +169,6 @@ public enum DistCpOptionSwitch { new Option("sizelimit", true, "(Deprecated!) Limit number of files " + "copied to <= n bytes")), - BLOCKS_PER_CHUNK("", - new Option("blocksperchunk", true, "If set to a positive value, files" - + "with more blocks than this value will be split into chunks of " - + " blocks to be transferred in parallel, and " - + "reassembled on the destination. By default, is " - + "0 and the files will be transmitted in their entirety without " - + "splitting. This switch is only applicable when the source file " - + "system implements getBlockLocations method and the target file " - + "system implements concat method")), - /** * Specify bandwidth per map in MB, accepts bandwidth as a fraction */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 9822d83..8c37ff3 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -97,11 +97,7 @@ public class DistCpOptions { // targetPathExist is a derived field, it's initialized in the // beginning of distcp. private boolean targetPathExists = true; - - // Size of chunk in number of blocks when splitting large file into chunks - // to copy in parallel. Default is 0 and file are not splitted. - private int blocksPerChunk = 0; - + public static enum FileAttribute{ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES; @@ -170,7 +166,6 @@ public class DistCpOptions { this.targetPath = that.getTargetPath(); this.targetPathExists = that.getTargetPathExists(); this.filtersFile = that.getFiltersFile(); - this.blocksPerChunk = that.blocksPerChunk; } } @@ -583,18 +578,6 @@ public class DistCpOptions { this.filtersFile = filtersFilename; } - public final void setBlocksPerChunk(int csize) { - this.blocksPerChunk = csize; - } - - public final int getBlocksPerChunk() { - return blocksPerChunk; - } - - public final boolean splitLargeFile() { - return blocksPerChunk > 0; - } - void validate() { if ((useDiff || useRdiff) && deleteMissing) { // -delete and -diff/-rdiff are mutually exclusive. For backward @@ -686,8 +669,6 @@ public class DistCpOptions { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS, filtersFile); } - DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK, - String.valueOf(blocksPerChunk)); } /** @@ -723,7 +704,6 @@ public class DistCpOptions { ", targetPath=" + targetPath + ", targetPathExists=" + targetPathExists + ", filtersFile='" + filtersFile + '\'' + - ", blocksPerChunk=" + blocksPerChunk + '}'; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 8881264..d0f82ca 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -172,44 +172,11 @@ public class OptionsParser { DistCpOptionSwitch.FILTERS.getSwitch())); } - parseBlocksPerChunk(command, option); - option.validate(); return option; } - - /** - * A helper method to parse chunk size in number of blocks. - * Used when breaking large file into chunks to copy in parallel. - * - * @param command command line arguments - */ - private static void parseBlocksPerChunk(CommandLine command, - DistCpOptions option) { - boolean hasOption = - command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()); - LOG.info("parseChunkSize: " + - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption); - if (hasOption) { - String chunkSizeString = getVal(command, - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim()); - try { - int csize = Integer.parseInt(chunkSizeString); - if (csize < 0) { - csize = 0; - } - LOG.info("Set distcp blocksPerChunk to " + csize); - option.setBlocksPerChunk(csize); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("blocksPerChunk is invalid: " - + chunkSizeString, e); - } - } - } - /** * parseSizeLimit is a helper method for parsing the deprecated * argument SIZE_LIMIT. @@ -244,7 +211,8 @@ public class OptionsParser { DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); try { Integer.parseInt(fileLimitString); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { throw new IllegalArgumentException("File-limit is invalid: " + fileLimitString, e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index af91347..105e4f2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -19,7 +19,6 @@ package org.apache.hadoop.tools; import com.google.common.collect.Lists; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -48,7 +47,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.LinkedList; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -242,10 +240,10 @@ public class SimpleCopyListing extends CopyListing { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs(); - LinkedList fileCopyListingStatus = + CopyListingFileStatus fileCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs, - options.getBlocksPerChunk()); + preserveAcls, preserveXAttrs, preserveRawXAttrs); + writeToFileListingRoot(fileListWriter, fileCopyListingStatus, sourceRoot, options); } @@ -350,10 +348,9 @@ public class SimpleCopyListing extends CopyListing { FileStatus[] sourceFiles = sourceFS.listStatus(path); boolean explore = (sourceFiles != null && sourceFiles.length > 0); if (!explore || rootStatus.isDirectory()) { - LinkedList rootCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs, - options.getBlocksPerChunk()); + CopyListingFileStatus rootCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, + preserveAcls, preserveXAttrs, preserveRawXAttrs); writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options); } @@ -363,20 +360,20 @@ public class SimpleCopyListing extends CopyListing { if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); } - LinkedList sourceCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, - preserveAcls && sourceStatus.isDirectory(), - preserveXAttrs && sourceStatus.isDirectory(), - preserveRawXAttrs && sourceStatus.isDirectory(), - options.getBlocksPerChunk()); - for (CopyListingFileStatus fs : sourceCopyListingStatus) { - if (randomizeFileListing) { - addToFileListing(statusList, - new FileStatusInfo(fs, sourcePathRoot), fileListWriter); - } else { - writeToFileListing(fileListWriter, fs, sourcePathRoot); - } + CopyListingFileStatus sourceCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, + preserveAcls && sourceStatus.isDirectory(), + preserveXAttrs && sourceStatus.isDirectory(), + preserveRawXAttrs && sourceStatus.isDirectory()); + if (randomizeFileListing) { + addToFileListing(statusList, + new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, sourceCopyListingStatus, + sourcePathRoot); } + if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); @@ -644,20 +641,18 @@ public class SimpleCopyListing extends CopyListing { LOG.debug("Recording source-path: " + child.getPath() + " for copy."); } if (workResult.getSuccess()) { - LinkedList childCopyListingStatus = + CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child, preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory(), - options.getBlocksPerChunk()); - - for (CopyListingFileStatus fs : childCopyListingStatus) { - if (randomizeFileListing) { - addToFileListing(fileStatuses, - new FileStatusInfo(fs, sourcePathRoot), fileListWriter); - } else { - writeToFileListing(fileListWriter, fs, sourcePathRoot); - } + preserveRawXattrs && child.isDirectory()); + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(childCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot); } } if (retry < maxRetries) { @@ -680,21 +675,19 @@ public class SimpleCopyListing extends CopyListing { } private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, - LinkedList fileStatus, Path sourcePathRoot, + CopyListingFileStatus fileStatus, Path sourcePathRoot, DistCpOptions options) throws IOException { boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite(); - for (CopyListingFileStatus fs : fileStatus) { - if (fs.getPath().equals(sourcePathRoot) && - fs.isDirectory() && syncOrOverwrite) { - // Skip the root-paths when syncOrOverwrite - if (LOG.isDebugEnabled()) { - LOG.debug("Skip " + fs.getPath()); - } - return; - } - writeToFileListing(fileListWriter, fs, sourcePathRoot); + if (fileStatus.getPath().equals(sourcePathRoot) && + fileStatus.isDirectory() && syncOrOverwrite) { + // Skip the root-paths when syncOrOverwrite + if (LOG.isDebugEnabled()) { + LOG.debug("Skip " + fileStatus.getPath()); + } + return; } + writeToFileListing(fileListWriter, fileStatus, sourcePathRoot); } private void writeToFileListing(SequenceFile.Writer fileListWriter, @@ -714,7 +707,7 @@ public class SimpleCopyListing extends CopyListing { fileListWriter.sync(); if (!fileStatus.isDirectory()) { - totalBytesToCopy += fileStatus.getSizeToCopy(); + totalBytesToCopy += fileStatus.getLen(); } else { totalDirs++; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 6ddaab9..75cefb4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -35,17 +34,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.tools.CopyListing; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; -import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.GlobbedCopyListing; import org.apache.hadoop.tools.util.DistCpUtils; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; -import java.util.LinkedList; import java.util.List; /** @@ -67,8 +63,7 @@ public class CopyCommitter extends FileOutputCommitter { private boolean syncFolder = false; private boolean overwrite = false; private boolean targetPathExists = true; - private boolean ignoreFailures = false; - + /** * Create a output committer * @@ -87,13 +82,8 @@ public class CopyCommitter extends FileOutputCommitter { Configuration conf = jobContext.getConfiguration(); syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); - targetPathExists = conf.getBoolean( - DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); - ignoreFailures = conf.getBoolean( - DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); - - concatFileChunks(conf); - + targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); + super.commitJob(jobContext); cleanupTempFiles(jobContext); @@ -179,112 +169,9 @@ public class CopyCommitter extends FileOutputCommitter { } } - private boolean isFileNotFoundException(IOException e) { - if (e instanceof FileNotFoundException) { - return true; - } - - if (e instanceof RemoteException) { - return ((RemoteException)e).unwrapRemoteException() - instanceof FileNotFoundException; - } - - return false; - } - - /** - * Concat chunk files for the same file into one. - * Iterate through copy listing, identify chunk files for the same file, - * concat them into one. - */ - private void concatFileChunks(Configuration conf) throws IOException { - - LOG.info("concat file chunks ..."); - - String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH); - if (spath == null || spath.isEmpty()) { - return; - } - Path sourceListing = new Path(spath); - SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sourceListing)); - Path targetRoot = - new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); - - try { - CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); - Text srcRelPath = new Text(); - CopyListingFileStatus lastFileStatus = null; - LinkedList allChunkPaths = new LinkedList(); - - // Iterate over every source path that was copied. - while (sourceReader.next(srcRelPath, srcFileStatus)) { - if (srcFileStatus.isDirectory()) { - continue; - } - Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); - Path targetFileChunkPath = - DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus); - if (LOG.isDebugEnabled()) { - LOG.debug(" add " + targetFileChunkPath + " to concat."); - } - allChunkPaths.add(targetFileChunkPath); - if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength() - == srcFileStatus.getLen()) { - // This is the last chunk of the splits, consolidate allChunkPaths - try { - concatFileChunks(conf, targetFile, allChunkPaths); - } catch (IOException e) { - // If the concat failed because a chunk file doesn't exist, - // then we assume that the CopyMapper has skipped copying this - // file, and we ignore the exception here. - // If a chunk file should have been created but it was not, then - // the CopyMapper would have failed. - if (!isFileNotFoundException(e)) { - String emsg = "Failed to concat chunk files for " + targetFile; - if (!ignoreFailures) { - throw new IOException(emsg, e); - } else { - LOG.warn(emsg, e); - } - } - } - allChunkPaths.clear(); - lastFileStatus = null; - } else { - if (lastFileStatus == null) { - lastFileStatus = new CopyListingFileStatus(srcFileStatus); - } else { - // Two neighboring chunks have to be consecutive ones for the same - // file, for them to be merged - if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) || - srcFileStatus.getChunkOffset() != - (lastFileStatus.getChunkOffset() + - lastFileStatus.getChunkLength())) { - String emsg = "Inconsistent sequence file: current " + - "chunk file " + srcFileStatus + " doesnt match prior " + - "entry " + lastFileStatus; - if (!ignoreFailures) { - throw new IOException(emsg); - } else { - LOG.warn(emsg + ", skipping concat this set."); - } - } else { - lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset()); - lastFileStatus.setChunkLength(srcFileStatus.getChunkLength()); - } - } - } - } - } finally { - IOUtils.closeStream(sourceReader); - } - } - // This method changes the target-directories' file-attributes (owner, // user/group permissions, etc.) based on the corresponding source directories. - private void preserveFileAttributesForDirectories(Configuration conf) - throws IOException { + private void preserveFileAttributesForDirectories(Configuration conf) throws IOException { String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); final boolean syncOrOverwrite = syncFolder || overwrite; @@ -438,57 +325,4 @@ public class CopyCommitter extends FileOutputCommitter { ", Unable to move to " + finalDir); } } - - /** - * Concat the passed chunk files into one and rename it the targetFile. - */ - private void concatFileChunks(Configuration conf, Path targetFile, - LinkedList allChunkPaths) throws IOException { - if (allChunkPaths.size() == 1) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("concat " + targetFile + " allChunkSize+ " - + allChunkPaths.size()); - } - FileSystem dstfs = targetFile.getFileSystem(conf); - - Path firstChunkFile = allChunkPaths.removeFirst(); - Path[] restChunkFiles = new Path[allChunkPaths.size()]; - allChunkPaths.toArray(restChunkFiles); - if (LOG.isDebugEnabled()) { - LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile)); - int i = 0; - for (Path f : restChunkFiles) { - LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f)); - ++i; - } - } - dstfs.concat(firstChunkFile, restChunkFiles); - if (LOG.isDebugEnabled()) { - LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); - } - rename(dstfs, firstChunkFile, targetFile); - } - - /** - * Rename tmp to dst on destFileSys. - * @param destFileSys the file ssystem - * @param tmp the source path - * @param dst the destination path - * @throws IOException if renaming failed - */ - private static void rename(FileSystem destFileSys, Path tmp, Path dst) - throws IOException { - try { - if (destFileSys.exists(dst)) { - destFileSys.delete(dst, true); - } - destFileSys.rename(tmp, dst); - } catch (IOException ioe) { - throw new IOException("Fail to rename tmp file (=" + tmp - + ") to destination file (=" + dst + ")", ioe); - } - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index d6b3ba8..e1873f1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -156,12 +156,10 @@ public class CopyMapper extends Mapper sourceFS = sourcePath.getFileSystem(conf); final boolean preserveXAttrs = fileAttributes.contains(FileAttribute.XATTR); - sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS, - sourceFS.getFileStatus(sourcePath), - fileAttributes.contains(FileAttribute.ACL), - preserveXAttrs, preserveRawXattrs, - sourceFileStatus.getChunkOffset(), - sourceFileStatus.getChunkLength()); + sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, + sourceFS.getFileStatus(sourcePath), + fileAttributes.contains(FileAttribute.ACL), + preserveXAttrs, preserveRawXattrs); } catch (FileNotFoundException e) { throw new IOException(new RetriableFileCopyCommand.CopyReadException(e)); } @@ -175,8 +173,7 @@ public class CopyMapper extends Mapper LOG.debug("Path could not be found: " + target, ignore); } - if (targetStatus != null && - (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { + if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { throw new IOException("Can't replace " + target + ". Target is " + getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus)); } @@ -186,28 +183,19 @@ public class CopyMapper extends Mapper return; } - FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, - targetStatus); - - Path tmpTarget = target; + FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus); if (action == FileAction.SKIP) { LOG.info("Skipping copy of " + sourceCurrStatus.getPath() + " to " + target); updateSkipCounters(context, sourceCurrStatus); context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath())); - } else { - if (sourceCurrStatus.isSplit()) { - tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus); - } - if (LOG.isDebugEnabled()) { - LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget); - } - copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context, + copyFileWithRetry(description, sourceCurrStatus, target, context, action, fileAttributes); } - DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget, - sourceCurrStatus, fileAttributes, preserveRawXattrs); + + DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus, + fileAttributes, preserveRawXattrs); } catch (IOException exception) { handleFailures(exception, sourceFileStatus, target, context); } @@ -273,12 +261,8 @@ public class CopyMapper extends Mapper private void handleFailures(IOException exception, CopyListingFileStatus sourceFileStatus, Path target, Context context) throws IOException, InterruptedException { - LOG.error("Failure in copying " + sourceFileStatus.getPath() + - (sourceFileStatus.isSplit()? "," - + " offset=" + sourceFileStatus.getChunkOffset() - + " chunkLength=" + sourceFileStatus.getChunkLength() - : "") + - " to " + target, exception); + LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + + target, exception); if (ignoreFailures && ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 2c17fef..06acd78 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -118,21 +118,17 @@ public class RetriableFileCopyCommand extends RetriableCommand { .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS .getFileChecksum(sourcePath) : null; - long offset = (action == FileAction.APPEND) ? - targetFS.getFileStatus(target).getLen() : source.getChunkOffset(); + final long offset = action == FileAction.APPEND ? targetFS.getFileStatus( + target).getLen() : 0; long bytesRead = copyToFile(targetPath, targetFS, source, offset, context, fileAttributes, sourceChecksum); - if (!source.isSplit()) { - compareFileLengths(source, targetPath, configuration, bytesRead - + offset); - } + compareFileLengths(source, targetPath, configuration, bytesRead + + offset); //At this point, src&dest lengths are same. if length==0, we skip checksum if ((bytesRead != 0) && (!skipCrc)) { - if (!source.isSplit()) { - compareCheckSums(sourceFS, source.getPath(), sourceChecksum, - targetFS, targetPath); - } + compareCheckSums(sourceFS, source.getPath(), sourceChecksum, + targetFS, targetPath); } // it's not append case, thus we first write to a temporary file, rename // it to the target path. @@ -253,26 +249,16 @@ public class RetriableFileCopyCommand extends RetriableCommand { ThrottledInputStream inStream = null; long totalBytesRead = 0; - long chunkLength = source2.getChunkLength(); - boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); int bytesRead = readBytes(inStream, buf, sourceOffset); while (bytesRead >= 0) { - if (chunkLength > 0 && - (totalBytesRead + bytesRead) >= chunkLength) { - bytesRead = (int)(chunkLength - totalBytesRead); - finished = true; - } totalBytesRead += bytesRead; if (action == FileAction.APPEND) { sourceOffset += bytesRead; } outStream.write(buf, 0, bytesRead); updateContextStatus(totalBytesRead, context, source2); - if (finished) { - break; - } bytesRead = readBytes(inStream, buf, sourceOffset); } outStream.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java index d1c18ea..3e86d09 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java @@ -99,8 +99,7 @@ public class UniformSizeInputFormat while (reader.next(srcRelPath, srcFileStatus)) { // If adding the current file would cause the bytes per map to exceed // limit. Add the current file to new split - if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit - && lastPosition != 0) { + if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) { FileSplit split = new FileSplit(listingFilePath, lastSplitStart, lastPosition - lastSplitStart, null); if (LOG.isDebugEnabled()) { @@ -110,7 +109,7 @@ public class UniformSizeInputFormat lastSplitStart = lastPosition; currentSplitSize = 0; } - currentSplitSize += srcFileStatus.getChunkLength(); + currentSplitSize += srcFileStatus.getLen(); lastPosition = reader.getPosition(); } if (lastPosition > lastSplitStart) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index e315b84..76bc4c5 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -19,11 +19,9 @@ package org.apache.hadoop.tools.util; import com.google.common.collect.Maps; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -32,7 +30,6 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; @@ -47,7 +44,6 @@ import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.text.DecimalFormat; import java.util.EnumSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -120,7 +116,7 @@ public class DistCpUtils { * @return Class implementing the strategy specified in options. */ public static Class getStrategy(Configuration conf, - DistCpOptions options) { + DistCpOptions options) { String confLabel = "distcp." + StringUtils.toLowerCase(options.getCopyStrategy()) + ".strategy" + ".impl"; @@ -302,86 +298,6 @@ public class DistCpUtils { } /** - * Converts FileStatus to a list of CopyListingFileStatus. - * The resulted list contains either one CopyListingFileStatus per chunk of - * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there - * are more blocks in the file than blocksperChunk), or a single - * CopyListingFileStatus for the entire file (if file-size is too small to - * split). - * If preserving ACLs, populates the CopyListingFileStatus with the ACLs. - * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs. - * - * @param fileSystem FileSystem containing the file - * @param fileStatus FileStatus of file - * @param preserveAcls boolean true if preserving ACLs - * @param preserveXAttrs boolean true if preserving XAttrs - * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs - * @param blocksPerChunk size of chunks when copying chunks in parallel - * @return list of CopyListingFileStatus - * @throws IOException if there is an I/O error - */ - public static LinkedList toCopyListingFileStatus( - FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, - boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk) - throws IOException { - LinkedList copyListingFileStatus = - new LinkedList(); - - final CopyListingFileStatus clfs = toCopyListingFileStatusHelper( - fileSystem, fileStatus, preserveAcls, - preserveXAttrs, preserveRawXAttrs, - 0, fileStatus.getLen()); - final long blockSize = fileStatus.getBlockSize(); - if (LOG.isDebugEnabled()) { - LOG.debug("toCopyListing: " + fileStatus + " chunkSize: " - + blocksPerChunk + " isDFS: " + - (fileSystem instanceof DistributedFileSystem)); - } - if ((blocksPerChunk > 0) && - !fileStatus.isDirectory() && - (fileStatus.getLen() > blockSize * blocksPerChunk)) { - // split only when the file size is larger than the intended chunk size - final BlockLocation[] blockLocations; - blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, - fileStatus.getLen()); - - int numBlocks = blockLocations.length; - long curPos = 0; - if (numBlocks <= blocksPerChunk) { - if (LOG.isDebugEnabled()) { - LOG.debug(" add file " + clfs); - } - copyListingFileStatus.add(clfs); - } else { - int i = 0; - while (i < numBlocks) { - long curLength = 0; - for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) { - curLength += blockLocations[i].getLength(); - } - if (curLength > 0) { - CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs); - clfs1.setChunkOffset(curPos); - clfs1.setChunkLength(curLength); - if (LOG.isDebugEnabled()) { - LOG.debug(" add file chunk " + clfs1); - } - copyListingFileStatus.add(clfs1); - curPos += curLength; - } - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(" add file/dir " + clfs); - } - copyListingFileStatus.add(clfs); - } - - return copyListingFileStatus; - } - - /** * Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs, * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs, * populates the CopyListingFileStatus with the XAttrs. @@ -391,17 +307,13 @@ public class DistCpUtils { * @param preserveAcls boolean true if preserving ACLs * @param preserveXAttrs boolean true if preserving XAttrs * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs - * @param chunkOffset chunk offset in bytes - * @param chunkLength chunk length in bytes - * @return CopyListingFileStatus * @throws IOException if there is an I/O error */ - public static CopyListingFileStatus toCopyListingFileStatusHelper( + public static CopyListingFileStatus toCopyListingFileStatus( FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, - boolean preserveXAttrs, boolean preserveRawXAttrs, - long chunkOffset, long chunkLength) throws IOException { + boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException { CopyListingFileStatus copyListingFileStatus = - new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength); + new CopyListingFileStatus(fileStatus); if (preserveAcls) { FsPermission perm = fileStatus.getPermission(); if (perm.getAclBit()) { @@ -558,19 +470,4 @@ public class DistCpUtils { return (sourceChecksum == null || targetChecksum == null || sourceChecksum.equals(targetChecksum)); } - - /* - * Return the Path for a given chunk. - * Used when splitting large file into chunks to copy in parallel. - * @param targetFile path to target file - * @param srcFileStatus source file status in copy listing - * @return path to the chunk specified by the parameters to store - * in target cluster temporarily - */ - public static Path getSplitChunkPath(Path targetFile, - CopyListingFileStatus srcFileStatus) { - return new Path(targetFile.toString() - + ".____distcpSplit____" + srcFileStatus.getChunkOffset() - + "." + srcFileStatus.getChunkLength()); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index a77deb2..41a6e94 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -237,7 +237,6 @@ Flag | Description | Notes `-rdiff ` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's ``, to make the target the same as ``. | This option is valid only with `-update` option and the following conditions should be satisfied.
  1. Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `` to target's current state).
  2. Two snapshots `` and `` have been created on the target FS, and `` is older than ``. No change has been made on target since `` was created on the target.
  3. The sour ce has the same snapshot ``, which has the same content as the `` on the target. All the files/directories in the target's `` are the same with source's ``.
| `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads. `-skipcrccheck` | Whether to skip CRC checks between source and target paths. | -`-blocksperchunk ` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `` blocks to be transferred in parallel, and reassembled on the destination. By default, `` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | Architecture of DistCp ---------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index b2266b3..e3018a0 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -23,27 +23,17 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; +import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; @@ -57,15 +47,11 @@ import org.junit.rules.Timeout; */ public class TestDistCpSystem { - private static final Log LOG = - LogFactory.getLog(TestDistCpSystem.class); - @Rule public Timeout globalTimeout = new Timeout(30000); private static final String SRCDAT = "srcdat"; private static final String DSTDAT = "dstdat"; - private static final long BLOCK_SIZE = 1024; private static MiniDFSCluster cluster; private static Configuration conf; @@ -77,76 +63,27 @@ public class TestDistCpSystem { this.path = path; this.isDir = isDir; } - - String getPath() { - return path; - } - - boolean isDirectory() { - return isDir; - } - } - - @BeforeClass - public static void beforeClass() throws IOException { - conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - cluster.waitActive(); - } - - @AfterClass - public static void afterClass() throws IOException { - if (cluster != null) { - cluster.shutdown(); - } - } - - static String execCmd(FsShell shell, String... args) throws Exception { - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(baout, true); - PrintStream old = System.out; - System.setOut(out); - shell.run(args); - out.close(); - System.setOut(old); - return baout.toString(); + String getPath() { return path; } + boolean isDirectory() { return isDir; } } - private void createFiles(DistributedFileSystem fs, String topdir, - FileEntry[] entries, long chunkSize) throws IOException { - long seed = System.currentTimeMillis(); - Random rand = new Random(seed); - short replicationFactor = 2; + private void createFiles(FileSystem fs, String topdir, + FileEntry[] entries) throws IOException { for (FileEntry entry : entries) { - Path newPath = new Path(topdir + "/" + entry.getPath()); + Path newpath = new Path(topdir + "/" + entry.getPath()); if (entry.isDirectory()) { - fs.mkdirs(newPath); + fs.mkdirs(newpath); } else { - long fileSize = BLOCK_SIZE *100; - int bufSize = 128; - if (chunkSize == -1) { - DFSTestUtil.createFile(fs, newPath, bufSize, - fileSize, BLOCK_SIZE, replicationFactor, seed); - } else { - // Create a variable length block file, by creating - // one block of half block size at the chunk boundary - long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2; - long seg2 = fileSize - seg1; - DFSTestUtil.createFile(fs, newPath, bufSize, - seg1, BLOCK_SIZE, replicationFactor, seed); - DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2); + OutputStream out = fs.create(newpath); + try { + out.write((topdir + "/" + entry).getBytes()); + out.write("\n".getBytes()); + } finally { + out.close(); } } - seed = System.currentTimeMillis() + rand.nextLong(); } } - - private void createFiles(DistributedFileSystem fs, String topdir, - FileEntry[] entries) throws IOException { - createFiles(fs, topdir, entries, -1); - } private static FileStatus[] getFileStatus(FileSystem fs, String topdir, FileEntry[] files) throws IOException { @@ -167,19 +104,18 @@ public class TestDistCpSystem { } private void testPreserveUserHelper(String testRoot, - FileEntry[] srcEntries, - FileEntry[] dstEntries, - boolean createSrcDir, - boolean createTgtDir, - boolean update) throws Exception { + FileEntry[] srcEntries, + FileEntry[] dstEntries, + boolean createSrcDir, + boolean createTgtDir, + boolean update) throws Exception { final String testSrcRel = SRCDAT; final String testSrc = testRoot + "/" + testSrcRel; final String testDstRel = DSTDAT; final String testDst = testRoot + "/" + testDstRel; String nnUri = FileSystem.getDefaultUri(conf).toString(); - DistributedFileSystem fs = (DistributedFileSystem) - FileSystem.get(URI.create(nnUri), conf); + FileSystem fs = FileSystem.get(URI.create(nnUri), conf); fs.mkdirs(new Path(testRoot)); if (createSrcDir) { fs.mkdirs(new Path(testSrc)); @@ -193,8 +129,8 @@ public class TestDistCpSystem { for(int i = 0; i < srcEntries.length; i++) { fs.setOwner(srcstats[i].getPath(), "u" + i, null); } - String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc, - nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst}; + String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc, + nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst}; ToolRunner.run(conf, new DistCp(), args); @@ -209,261 +145,18 @@ public class TestDistCpSystem { deldir(fs, testRoot); } - private void compareFiles(FileSystem fs, FileStatus srcStat, - FileStatus dstStat) throws Exception { - LOG.info("Comparing " + srcStat + " and " + dstStat); - assertEquals(srcStat.isDirectory(), dstStat.isDirectory()); - assertEquals(srcStat.getReplication(), dstStat.getReplication()); - assertEquals("File POSIX permission should match", - srcStat.getPermission(), dstStat.getPermission()); - assertEquals("File user ownership should match", - srcStat.getOwner(), dstStat.getOwner()); - assertEquals("File group ownership should match", - srcStat.getGroup(), dstStat.getGroup()); - // TODO; check ACL attributes - - if (srcStat.isDirectory()) { - return; - } - - assertEquals("File length should match (" + srcStat.getPath() + ")", - srcStat.getLen(), dstStat.getLen()); - - FSDataInputStream srcIn = fs.open(srcStat.getPath()); - FSDataInputStream dstIn = fs.open(dstStat.getPath()); - try { - byte[] readSrc = new byte[(int) - HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; - byte[] readDst = new byte[(int) - HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; - - int srcBytesRead = 0, tgtBytesRead = 0; - int srcIdx = 0, tgtIdx = 0; - long totalComparedBytes = 0; - while (true) { - if (srcBytesRead == 0) { - srcBytesRead = srcIn.read(readSrc); - srcIdx = 0; - } - if (tgtBytesRead == 0) { - tgtBytesRead = dstIn.read(readDst); - tgtIdx = 0; - } - if (srcBytesRead == 0 || tgtBytesRead == 0) { - LOG.info("______ compared src and dst files for " - + totalComparedBytes + " bytes, content match."); - if (srcBytesRead != tgtBytesRead) { - Assert.fail("Read mismatching size, compared " - + totalComparedBytes + " bytes between src and dst file " - + srcStat + " and " + dstStat); - } - if (totalComparedBytes != srcStat.getLen()) { - Assert.fail("Only read/compared " + totalComparedBytes + - " bytes between src and dst file " + srcStat + - " and " + dstStat); - } else { - // success - break; - } - } - for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead; - ++srcIdx, ++tgtIdx) { - if (readSrc[srcIdx] != readDst[tgtIdx]) { - Assert.fail("src and dst file does not match at " - + totalComparedBytes + " between " - + srcStat + " and " + dstStat); - } - ++totalComparedBytes; - } - LOG.info("______ compared src and dst files for " - + totalComparedBytes + " bytes, content match. FileLength: " - + srcStat.getLen()); - if (totalComparedBytes == srcStat.getLen()) { - LOG.info("______ Final:" + srcIdx + " " - + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead); - break; - } - if (srcIdx == srcBytesRead) { - srcBytesRead = 0; - } - if (tgtIdx == tgtBytesRead) { - tgtBytesRead = 0; - } - } - } finally { - if (srcIn != null) { - srcIn.close(); - } - if (dstIn != null) { - dstIn.close(); - } - } - } - - // WC: needed because the current distcp does not create target dirs - private void createDestDir(FileSystem fs, String testDst, - FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException { - fs.mkdirs(new Path(testDst)); - - for (int i=0; i=0; --i) { - if (!srcFiles[i].isDirectory()) { - LOG.info("Modifying " + srcStats[i].getPath()); - DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(), - (int)BLOCK_SIZE * 3); - break; - } - } - // get file status after modifying file - srcStats = getFileStatus(fs, testRoot, srcFiles); - - args = new String[] {"-pugp", "-update", "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; - - copyAndVerify(fs, srcFiles, srcStats, testDst, args); - - deldir(fs, testRoot); - } - - @Test - public void testRecursiveChunkCopy() throws Exception { - FileEntry[] srcFiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file0", false), - new FileEntry(SRCDAT + "/dir1", true), - new FileEntry(SRCDAT + "/dir2", true), - new FileEntry(SRCDAT + "/dir1/file1", false) - }; - chunkCopy(srcFiles); - } - - @Test - public void testChunkCopyOneFile() throws Exception { - FileEntry[] srcFiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file0", false) - }; - chunkCopy(srcFiles); + @BeforeClass + public static void beforeClass() throws IOException { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); } - @Test - public void testDistcpLargeFile() throws Exception { - FileEntry[] srcfiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file", false) - }; - - final String testRoot = "/testdir"; - final String testSrcRel = SRCDAT; - final String testSrc = testRoot + "/" + testSrcRel; - final String testDstRel = DSTDAT; - final String testDst = testRoot + "/" + testDstRel; - - String nnUri = FileSystem.getDefaultUri(conf).toString(); - DistributedFileSystem fs = - (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); - fs.mkdirs(new Path(testRoot)); - fs.mkdirs(new Path(testSrc)); - fs.mkdirs(new Path(testDst)); - long chunkSize = 6; - createFiles(fs, testRoot, srcfiles, chunkSize); - - String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath(); - Path srcfile = new Path(srcFileName); - - if(!cluster.getFileSystem().exists(srcfile)){ - throw new Exception("src not exist"); - } - - final long srcLen = fs.getFileStatus(srcfile).getLen(); - - FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles); - for (int i = 0; i < srcfiles.length; i++) { - fs.setOwner(srcstats[i].getPath(), "u" + i, null); + @AfterClass + public static void afterClass() throws IOException { + if (cluster != null) { + cluster.shutdown(); } - String[] args = new String[] { - "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, - nnUri + testDst - }; - - LOG.info("_____ running distcp: " + args[0] + " " + args[1]); - ToolRunner.run(conf, new DistCp(), args); - - String realTgtPath = testDst; - FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles); - assertEquals("File length should match", srcLen, - dststat[dststat.length - 1].getLen()); - - this.compareFiles(fs, srcstats[srcstats.length-1], - dststat[dststat.length-1]); - deldir(fs, testRoot); } @Test @@ -487,6 +180,7 @@ public class TestDistCpSystem { testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false); } + @Test public void testPreserveUserEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index f94ba97..efe4627 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -394,7 +394,7 @@ public class TestOptionsParser { + "copyStrategy='uniformsize', preserveStatus=[], " + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, " - + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}"; + + "targetPathExists=true, filtersFile='null'}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/144f1cf7/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 2452d6f..2e9a350 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -81,10 +81,6 @@ public class TestCopyCommitter { @Before public void createMetaFolder() { config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); - // Unset listing file path since the config is shared by - // multiple tests, and some test doesn't set it, such as - // testNoCommitAction, but the distcp code will check it. - config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); Path meta = new Path("/meta"); try { cluster.getFileSystem().mkdirs(meta); @@ -330,6 +326,7 @@ public class TestCopyCommitter { committer.commitJob(jobContext); Assert.assertFalse(fs.exists(new Path(workPath))); Assert.assertTrue(fs.exists(new Path(finalPath))); + } catch (IOException e) { LOG.error("Exception encountered while testing for preserve status", e); Assert.fail("Atomic commit failure"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org