Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F7D31036D for ; Thu, 30 Jan 2014 23:53:55 +0000 (UTC) Received: (qmail 62955 invoked by uid 500); 30 Jan 2014 23:53:53 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 62897 invoked by uid 500); 30 Jan 2014 23:53:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 62890 invoked by uid 99); 30 Jan 2014 23:53:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jan 2014 23:53:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jan 2014 23:53:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A65CC23889D5 for ; Thu, 30 Jan 2014 23:53:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1563019 - in /hadoop/common/trunk/hadoop-tools/hadoop-distcp/src: main/java/org/apache/hadoop/tools/ main/java/org/apache/hadoop/tools/mapred/ main/java/org/apache/hadoop/tools/util/ test/java/org/apache/hadoop/tools/ test/java/org/apache/... Date: Thu, 30 Jan 2014 23:53:24 -0000 To: common-commits@hadoop.apache.org From: jing9@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140130235324.A65CC23889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jing9 Date: Thu Jan 30 23:53:23 2014 New Revision: 1563019 URL: http://svn.apache.org/r1563019 Log: HADOOP-10295. Allow distcp to automatically identify the checksum type of source files and use it for the target. Contributed by Jing Zhao and Laurent Goujon. Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java Thu Jan 30 23:53:23 2014 @@ -37,15 +37,16 @@ public enum DistCpOptionSwitch { /** * Preserves status of file/path in the target. * Default behavior with -p, is to preserve replication, - * block size, user, group and permission on the target file + * block size, user, group, permission and checksum type on the target file. + * Note that when preserving checksum type, block size is also preserved. * - * If any of the optional switches are present among rbugp, then - * only the corresponding file attribute is preserved + * If any of the optional switches are present among rbugpc, then + * only the corresponding file attribute is preserved. * */ PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, - new Option("p", true, "preserve status (rbugp)" + - "(replication, block-size, user, group, permission)")), + new Option("p", true, "preserve status (rbugpc)" + + "(replication, block-size, user, group, permission, checksum-type)")), /** * Update target location by copying only files that are missing @@ -53,7 +54,7 @@ public enum DistCpOptionSwitch { * across source and target. Typically used with DELETE_MISSING * Incompatible with ATOMIC_COMMIT */ - SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, + SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, new Option("update", false, "Update target, copying only missing" + "files or directories")), @@ -80,7 +81,7 @@ public enum DistCpOptionSwitch { * Max number of maps to use during copy. DistCp will split work * as equally as possible among these maps */ - MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, + MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, new Option("m", true, "Max number of concurrent maps to use for copy")), /** Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java Thu Jan 30 23:53:23 2014 @@ -61,7 +61,7 @@ public class DistCpOptions { private Path targetPath; public static enum FileAttribute{ - REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION; + REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE; public static FileAttribute getAttribute(char symbol) { for (FileAttribute attribute : values()) { Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java Thu Jan 30 23:53:23 2014 @@ -34,7 +34,7 @@ public class OptionsParser { private static final Log LOG = LogFactory.getLog(OptionsParser.class); - private static final Options cliOptions = new Options(); + private static final Options cliOptions = new Options(); static { for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) { @@ -50,7 +50,7 @@ public class OptionsParser { protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) { for (int index = 0; index < arguments.length; index++) { if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { - arguments[index] = "-prbugp"; + arguments[index] = "-prbugpc"; } } return super.flatten(options, arguments, stopAtNonOption); @@ -125,7 +125,7 @@ public class OptionsParser { option.setAtomicWorkPath(new Path(workPath)); } } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) { - throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic"); + throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic"); } if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) { Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java Thu Jan 30 23:53:23 2014 @@ -111,7 +111,7 @@ public class CopyMapper extends Mapper fileAttributes, + FileChecksum sourceChecksum) { + if (fileAttributes.contains(FileAttribute.CHECKSUMTYPE) + && sourceChecksum != null) { + return sourceChecksum.getChecksumOpt(); + } + return null; + } + private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS, - FileStatus sourceFileStatus, Mapper.Context context, - EnumSet fileAttributes) - throws IOException { - OutputStream outStream = new BufferedOutputStream(targetFS.create( - tmpTargetPath, true, BUFFER_SIZE, - getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), - getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context)); + FileStatus sourceFileStatus, Mapper.Context context, + EnumSet fileAttributes, final FileChecksum sourceChecksum) + throws IOException { + FsPermission permission = FsPermission.getFileDefault().applyUMask( + FsPermission.getUMask(targetFS.getConf())); + OutputStream outStream = new BufferedOutputStream( + targetFS.create(tmpTargetPath, permission, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE, + getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, + tmpTargetPath), + getBlockSize(fileAttributes, sourceFileStatus, targetFS, + tmpTargetPath), + context, getChecksumOpt(fileAttributes, sourceChecksum))); return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context); } @@ -140,9 +175,10 @@ public class RetriableFileCopyCommand ex } private void compareCheckSums(FileSystem sourceFS, Path source, - FileSystem targetFS, Path target) - throws IOException { - if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) { + FileChecksum sourceChecksum, FileSystem targetFS, Path target) + throws IOException { + if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum, + targetFS, target)) { StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") .append(source).append(" and ").append(target).append("."); if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { @@ -249,11 +285,18 @@ public class RetriableFileCopyCommand ex sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath); } + /** + * @return the block size of the source file if we need to preserve either + * the block size or the checksum type. Otherwise the default block + * size of the target FS. + */ private static long getBlockSize( EnumSet fileAttributes, FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { - return fileAttributes.contains(FileAttribute.BLOCKSIZE)? - sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(tmpTargetPath); + boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE) + || fileAttributes.contains(FileAttribute.CHECKSUMTYPE); + return preserve ? sourceFile.getBlockSize() : targetFS + .getDefaultBlockSize(tmpTargetPath); } /** @@ -261,7 +304,7 @@ public class RetriableFileCopyCommand ex * failures from other kinds of IOExceptions. * The failure to read from source is dealt with specially, in the CopyMapper. * Such failures may be skipped if the DistCpOptions indicate so. - * Write failures are intolerable, and amount to CopyMapper failure. + * Write failures are intolerable, and amount to CopyMapper failure. */ public static class CopyReadException extends IOException { public CopyReadException(Throwable rootCause) { Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java Thu Jan 30 23:53:23 2014 @@ -125,7 +125,7 @@ public class DistCpUtils { * @param sourceRootPath - Source root path * @param childPath - Path for which relative path is required * @return - Relative portion of the child path (always prefixed with / - * unless it is empty + * unless it is empty */ public static String getRelativePath(Path sourceRootPath, Path childPath) { String childPathString = childPath.toUri().getPath(); @@ -277,9 +277,11 @@ public class DistCpUtils { * If checksums's can't be retrieved, it doesn't fail the test * Only time the comparison would fail is when checksums are * available and they don't match - * + * * @param sourceFS FileSystem for the source path. * @param source The source path. + * @param sourceChecksum The checksum of the source file. If it is null we + * still need to retrieve it through sourceFS. * @param targetFS FileSystem for the target path. * @param target The target path. * @return If either checksum couldn't be retrieved, the function returns @@ -288,12 +290,12 @@ public class DistCpUtils { * @throws IOException if there's an exception while retrieving checksums. */ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, - FileSystem targetFS, Path target) - throws IOException { - FileChecksum sourceChecksum = null; + FileChecksum sourceChecksum, FileSystem targetFS, Path target) + throws IOException { FileChecksum targetChecksum = null; try { - sourceChecksum = sourceFS.getFileChecksum(source); + sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS + .getFileChecksum(source); targetChecksum = targetFS.getFileChecksum(target); } catch (IOException e) { LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java Thu Jan 30 23:53:23 2014 @@ -110,7 +110,7 @@ public class TestOptionsParser { "hdfs://localhost:8020/target/"}); Assert.assertEquals(options.getMapBandwidth(), 11); } - + @Test(expected=IllegalArgumentException.class) public void testParseNonPositiveBandwidth() { OptionsParser.parse(new String[] { @@ -119,7 +119,7 @@ public class TestOptionsParser { "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); } - + @Test(expected=IllegalArgumentException.class) public void testParseZeroBandwidth() { OptionsParser.parse(new String[] { @@ -397,6 +397,7 @@ public class TestOptionsParser { Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); options = OptionsParser.parse(new String[] { "-p", @@ -408,6 +409,7 @@ public class TestOptionsParser { Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); options = OptionsParser.parse(new String[] { "-p", @@ -418,6 +420,7 @@ public class TestOptionsParser { Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); options = OptionsParser.parse(new String[] { "-pbr", @@ -429,6 +432,7 @@ public class TestOptionsParser { Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); options = OptionsParser.parse(new String[] { "-pbrgup", @@ -440,6 +444,31 @@ public class TestOptionsParser { Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); + + options = OptionsParser.parse(new String[] { + "-pbrgupc", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); + + options = OptionsParser.parse(new String[] { + "-pc", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE)); options = OptionsParser.parse(new String[] { "-p", @@ -452,7 +481,7 @@ public class TestOptionsParser { attribIterator.next(); i++; } - Assert.assertEquals(i, 5); + Assert.assertEquals(i, 6); try { OptionsParser.parse(new String[] { Modified: hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1563019&r1=1563018&r2=1563019&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java Thu Jan 30 23:53:23 2014 @@ -18,18 +18,28 @@ package org.apache.hadoop.tools.mapred; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.DistCpConstants; @@ -37,23 +47,17 @@ import org.apache.hadoop.tools.DistCpOpt import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.StubContext; import org.apache.hadoop.tools.util.DistCpUtils; +import org.apache.hadoop.util.DataChecksum; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; - public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List pathList = new ArrayList(); private static int nFiles = 0; private static final int DEFAULT_FILE_SIZE = 1024; + private static final long NON_DEFAULT_BLOCK_SIZE = 4096; private static MiniDFSCluster cluster; @@ -119,12 +123,27 @@ public class TestCopyMapper { mkdirs(SOURCE_PATH + "/2/3/4"); mkdirs(SOURCE_PATH + "/2/3"); mkdirs(SOURCE_PATH + "/5"); - touchFile(SOURCE_PATH + "/5/6", true); + touchFile(SOURCE_PATH + "/5/6", true, null); mkdirs(SOURCE_PATH + "/7"); mkdirs(SOURCE_PATH + "/7/8"); touchFile(SOURCE_PATH + "/7/8/9"); } + private static void createSourceDataWithDifferentChecksumType() + throws Exception { + mkdirs(SOURCE_PATH + "/1"); + mkdirs(SOURCE_PATH + "/2"); + mkdirs(SOURCE_PATH + "/2/3/4"); + mkdirs(SOURCE_PATH + "/2/3"); + mkdirs(SOURCE_PATH + "/5"); + touchFile(SOURCE_PATH + "/5/6", new ChecksumOpt(DataChecksum.Type.CRC32, + 512)); + mkdirs(SOURCE_PATH + "/7"); + mkdirs(SOURCE_PATH + "/7/8"); + touchFile(SOURCE_PATH + "/7/8/9", new ChecksumOpt(DataChecksum.Type.CRC32C, + 512)); + } + private static void mkdirs(String path) throws Exception { FileSystem fileSystem = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), @@ -134,21 +153,31 @@ public class TestCopyMapper { } private static void touchFile(String path) throws Exception { - touchFile(path, false); + touchFile(path, false, null); } - private static void touchFile(String path, boolean createMultipleBlocks) throws Exception { - final long NON_DEFAULT_BLOCK_SIZE = 4096; + private static void touchFile(String path, ChecksumOpt checksumOpt) + throws Exception { + // create files with specific checksum opt and non-default block size + touchFile(path, true, checksumOpt); + } + + private static void touchFile(String path, boolean createMultipleBlocks, + ChecksumOpt checksumOpt) throws Exception { FileSystem fs; DataOutputStream outputStream = null; try { fs = cluster.getFileSystem(); final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), - fs.getWorkingDirectory()); - final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2; - outputStream = fs.create(qualifiedPath, true, 0, - (short)(fs.getDefaultReplication(qualifiedPath)*2), - blockSize); + fs.getWorkingDirectory()); + final long blockSize = createMultipleBlocks ? NON_DEFAULT_BLOCK_SIZE : fs + .getDefaultBlockSize(qualifiedPath) * 2; + FsPermission permission = FsPermission.getFileDefault().applyUMask( + FsPermission.getUMask(fs.getConf())); + outputStream = fs.create(qualifiedPath, permission, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 0, + (short) (fs.getDefaultReplication(qualifiedPath) * 2), blockSize, + null, checksumOpt); byte[] bytes = new byte[DEFAULT_FILE_SIZE]; outputStream.write(bytes); long fileSize = DEFAULT_FILE_SIZE; @@ -171,17 +200,40 @@ public class TestCopyMapper { } } + @Test + public void testCopyWithDifferentChecksumType() throws Exception { + testCopy(true); + } + @Test(timeout=40000) public void testRun() { + testCopy(false); + } + + private void testCopy(boolean preserveChecksum) { try { deleteState(); - createSourceData(); + if (preserveChecksum) { + createSourceDataWithDifferentChecksumType(); + } else { + createSourceData(); + } FileSystem fs = cluster.getFileSystem(); CopyMapper copyMapper = new CopyMapper(); StubContext stubContext = new StubContext(getConfiguration(), null, 0); Mapper.Context context = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet fileAttributes + = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION); + if (preserveChecksum) { + fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE); + } + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + copyMapper.setup(context); for (Path path: pathList) { @@ -195,19 +247,29 @@ public class TestCopyMapper { .replaceAll(SOURCE_PATH, TARGET_PATH)); Assert.assertTrue(fs.exists(targetPath)); Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); - Assert.assertEquals(fs.getFileStatus(path).getReplication(), - fs.getFileStatus(targetPath).getReplication()); - Assert.assertEquals(fs.getFileStatus(path).getBlockSize(), - fs.getFileStatus(targetPath).getBlockSize()); - Assert.assertTrue(!fs.isFile(targetPath) || - fs.getFileChecksum(targetPath).equals( - fs.getFileChecksum(path))); + FileStatus sourceStatus = fs.getFileStatus(path); + FileStatus targetStatus = fs.getFileStatus(targetPath); + Assert.assertEquals(sourceStatus.getReplication(), + targetStatus.getReplication()); + if (preserveChecksum) { + Assert.assertEquals(sourceStatus.getBlockSize(), + targetStatus.getBlockSize()); + } + Assert.assertTrue(!fs.isFile(targetPath) + || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path))); } Assert.assertEquals(pathList.size(), stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); - Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, - stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); + if (!preserveChecksum) { + Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext + .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) + .getValue()); + } else { + Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext + .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) + .getValue()); + } testCopyingExistingFiles(fs, copyMapper, context); for (Text value : stubContext.getWriter().values()) { @@ -309,7 +371,7 @@ public class TestCopyMapper { UserGroupInformation tmpUser = UserGroupInformation.createRemoteUser("guest"); final CopyMapper copyMapper = new CopyMapper(); - + final Mapper.Context context = tmpUser. doAs(new PrivilegedAction.Context>() { @Override @@ -535,7 +597,7 @@ public class TestCopyMapper { final Mapper.Context context = stubContext.getContext(); - + context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS, DistCpUtils.packAttributes(preserveStatus));