hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [03/50] [abbrv] hadoop git commit: HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li.
Date Fri, 07 Apr 2017 01:58:59 GMT
HADOOP-11794. Enable distcp to copy blocks in parallel. Contributed by Yongjun Zhang, Wei-Chiu Chuang, Xiao Chen, Rosie Li.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/20e57a9a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/20e57a9a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/20e57a9a

Branch: refs/heads/HDFS-10467
Commit: 20e57a9afbd71c50699864c1dc2a7620b448c924
Parents: 0ca98cb
Author: Yongjun Zhang <yzhang@cloudera.com>
Authored: Thu Mar 30 17:01:15 2017 -0700
Committer: Inigo <inigoiri@apache.org>
Committed: Thu Apr 6 18:58:19 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  22 +-
 .../org/apache/hadoop/tools/CopyListing.java    |  37 +-
 .../hadoop/tools/CopyListingFileStatus.java     |  87 ++++-
 .../java/org/apache/hadoop/tools/DistCp.java    |  52 +++
 .../apache/hadoop/tools/DistCpOptionSwitch.java |  10 +
 .../org/apache/hadoop/tools/DistCpOptions.java  |  22 +-
 .../org/apache/hadoop/tools/OptionsParser.java  |  36 +-
 .../apache/hadoop/tools/SimpleCopyListing.java  |  83 +++--
 .../hadoop/tools/mapred/CopyCommitter.java      | 174 ++++++++-
 .../apache/hadoop/tools/mapred/CopyMapper.java  |  40 +-
 .../tools/mapred/RetriableFileCopyCommand.java  |  26 +-
 .../tools/mapred/UniformSizeInputFormat.java    |   5 +-
 .../apache/hadoop/tools/util/DistCpUtils.java   | 111 +++++-
 .../src/site/markdown/DistCp.md.vm              |   1 +
 .../apache/hadoop/tools/TestDistCpSystem.java   | 368 +++++++++++++++++--
 .../apache/hadoop/tools/TestOptionsParser.java  |   2 +-
 .../hadoop/tools/mapred/TestCopyCommitter.java  |   5 +-
 17 files changed, 971 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 1329195..9b782f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -862,7 +862,27 @@ public class DFSTestUtil {
       out.write(toAppend);
     }
   }
-  
+
+  /**
+   * Append specified length of bytes to a given file, starting with new block.
+   * @param fs The file system
+   * @param p Path of the file to append
+   * @param length Length of bytes to append to the file
+   * @throws IOException
+   */
+  public static void appendFileNewBlock(DistributedFileSystem fs,
+      Path p, int length) throws IOException {
+    assert fs.exists(p);
+    assert length >= 0;
+    byte[] toAppend = new byte[length];
+    Random random = new Random();
+    random.nextBytes(toAppend);
+    try (FSDataOutputStream out = fs.append(p,
+        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
+      out.write(toAppend);
+    }
+  }
+
   /**
    * @return url content as string (UTF-8 encoding assumed)
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index 481aa61..9ebf9d2 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -145,12 +145,22 @@ public abstract class CopyListing extends Configured {
     Configuration config = getConf();
     FileSystem fs = pathToListFile.getFileSystem(config);
 
-    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+    final boolean splitLargeFile = options.splitLargeFile();
+
+    // When splitLargeFile is enabled, we don't randomize the copylist
+    // earlier, so we don't do the sorting here. For a file that has
+    // multiple entries due to split, we check here that their
+    // <chunkOffset, chunkLength> is continuous.
+    //
+    Path checkPath = splitLargeFile?
+        pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile);
 
     SequenceFile.Reader reader = new SequenceFile.Reader(
-                          config, SequenceFile.Reader.file(sortedList));
+                          config, SequenceFile.Reader.file(checkPath));
     try {
       Text lastKey = new Text("*"); //source relative path can never hold *
+      long lastChunkOffset = -1;
+      long lastChunkLength = -1;
       CopyListingFileStatus lastFileStatus = new CopyListingFileStatus();
 
       Text currentKey = new Text();
@@ -161,8 +171,21 @@ public abstract class CopyListing extends Configured {
         if (currentKey.equals(lastKey)) {
           CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
           reader.getCurrentValue(currentFileStatus);
-          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
-              currentFileStatus.getPath() + " would cause duplicates. Aborting");
+          if (!splitLargeFile) {
+            throw new DuplicateFileException("File " + lastFileStatus.getPath()
+                + " and " + currentFileStatus.getPath()
+                + " would cause duplicates. Aborting");
+          } else {
+            if (lastChunkOffset + lastChunkLength !=
+                currentFileStatus.getChunkOffset()) {
+              throw new InvalidInputException("File " + lastFileStatus.getPath()
+                  + " " + lastChunkOffset + "," + lastChunkLength
+                  + " and " + currentFileStatus.getPath()
+                  + " " + currentFileStatus.getChunkOffset() + ","
+                  + currentFileStatus.getChunkLength()
+                  + " are not continuous. Aborting");
+            }
+          }
         }
         reader.getCurrentValue(lastFileStatus);
         if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
@@ -181,8 +204,12 @@ public abstract class CopyListing extends Configured {
             xAttrSupportCheckFsSet.add(lastFsUri);
           }
         }
-        lastKey.set(currentKey);
 
+        lastKey.set(currentKey);
+        if (splitLargeFile) {
+          lastChunkOffset = lastFileStatus.getChunkOffset();
+          lastChunkLength = lastFileStatus.getChunkLength();
+        }
         if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
           LOG.debug("Copy list entry " + idx + ": " +
                   lastFileStatus.getPath().toUri().getPath());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
index 00d4b32..29c59ac 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
@@ -74,6 +74,14 @@ public final class CopyListingFileStatus implements Writable {
   private List<AclEntry> aclEntries;
   private Map<String, byte[]> xAttrs;
 
+  // <chunkOffset, chunkLength> represents the offset and length of a file
+  // chunk in number of bytes.
+  // used when splitting a large file to chunks to copy in parallel.
+  // If a file is not large enough to split, chunkOffset would be 0 and
+  // chunkLength would be the length of the file.
+  private long chunkOffset = 0;
+  private long chunkLength = Long.MAX_VALUE;
+
   /**
    * Default constructor.
    */
@@ -96,11 +104,32 @@ public final class CopyListingFileStatus implements Writable {
         fileStatus.getPath());
   }
 
+  public CopyListingFileStatus(FileStatus fileStatus,
+      long chunkOffset, long chunkLength) {
+    this(fileStatus.getLen(), fileStatus.isDirectory(),
+        fileStatus.getReplication(), fileStatus.getBlockSize(),
+        fileStatus.getModificationTime(), fileStatus.getAccessTime(),
+        fileStatus.getPermission(), fileStatus.getOwner(),
+        fileStatus.getGroup(),
+        fileStatus.getPath());
+    this.chunkOffset = chunkOffset;
+    this.chunkLength = chunkLength;
+  }
+
   @SuppressWarnings("checkstyle:parameternumber")
   public CopyListingFileStatus(long length, boolean isdir,
       int blockReplication, long blocksize, long modificationTime,
       long accessTime, FsPermission permission, String owner, String group,
       Path path) {
+    this(length, isdir, blockReplication, blocksize, modificationTime,
+        accessTime, permission, owner, group, path, 0, Long.MAX_VALUE);
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public CopyListingFileStatus(long length, boolean isdir,
+      int blockReplication, long blocksize, long modificationTime,
+      long accessTime, FsPermission permission, String owner, String group,
+      Path path, long chunkOffset, long chunkLength) {
     this.length = length;
     this.isdir = isdir;
     this.blockReplication = (short)blockReplication;
@@ -117,6 +146,23 @@ public final class CopyListingFileStatus implements Writable {
     this.owner = (owner == null) ? "" : owner;
     this.group = (group == null) ? "" : group;
     this.path = path;
+    this.chunkOffset = chunkOffset;
+    this.chunkLength = chunkLength;
+  }
+
+  public CopyListingFileStatus(CopyListingFileStatus other) {
+    this.length = other.length;
+    this.isdir = other.isdir;
+    this.blockReplication = other.blockReplication;
+    this.blocksize = other.blocksize;
+    this.modificationTime = other.modificationTime;
+    this.accessTime = other.accessTime;
+    this.permission = other.permission;
+    this.owner = other.owner;
+    this.group = other.group;
+    this.path = new Path(other.path.toUri());
+    this.chunkOffset = other.chunkOffset;
+    this.chunkLength = other.chunkLength;
   }
 
   public Path getPath() {
@@ -200,6 +246,31 @@ public final class CopyListingFileStatus implements Writable {
     this.xAttrs = xAttrs;
   }
 
+  public long getChunkOffset() {
+    return chunkOffset;
+  }
+
+  public void setChunkOffset(long offset) {
+    this.chunkOffset = offset;
+  }
+
+  public long getChunkLength() {
+    return chunkLength;
+  }
+
+  public void setChunkLength(long chunkLength) {
+    this.chunkLength = chunkLength;
+  }
+
+  public boolean isSplit() {
+    return getChunkLength() != Long.MAX_VALUE &&
+        getChunkLength() != getLen();
+  }
+
+  public long getSizeToCopy() {
+    return isSplit()? getChunkLength() : getLen();
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN);
@@ -244,6 +315,9 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       out.writeInt(NO_XATTRS);
     }
+
+    out.writeLong(chunkOffset);
+    out.writeLong(chunkLength);
   }
 
   @Override
@@ -292,6 +366,9 @@ public final class CopyListingFileStatus implements Writable {
     } else {
       xAttrs = null;
     }
+
+    chunkOffset = in.readLong();
+    chunkLength = in.readLong();
   }
 
   @Override
@@ -317,8 +394,14 @@ public final class CopyListingFileStatus implements Writable {
   public String toString() {
     StringBuilder sb = new StringBuilder(super.toString());
     sb.append('{');
-    sb.append("aclEntries = " + aclEntries);
-    sb.append(", xAttrs = " + xAttrs);
+    sb.append(this.getPath().toString());
+    sb.append(" length = ").append(this.getLen());
+    sb.append(" aclEntries = ").append(aclEntries);
+    sb.append(", xAttrs = ").append(xAttrs);
+    if (isSplit()) {
+      sb.append(", chunkOffset = ").append(this.getChunkOffset());
+      sb.append(", chunkLength = ").append(this.getChunkLength());
+    }
     sb.append('}');
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index ab58e9c..8c2fa24 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.CopyListing.*;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@@ -134,6 +135,7 @@ public class DistCp extends Configured implements Tool {
     
     try {
       inputOptions = (OptionsParser.parse(argv));
+      setOptionsForSplitLargeFile();
       setTargetPathExists();
       LOG.info("Input Options: " + inputOptions);
     } catch (Throwable e) {
@@ -235,6 +237,56 @@ public class DistCp extends Configured implements Tool {
     getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
         targetExists);
   }
+
+  /**
+   * Check if concat is supported by fs.
+   * Throws UnsupportedOperationException if not.
+   */
+  private void checkConcatSupport(FileSystem fs) {
+    try {
+      Path[] src = null;
+      Path tgt = null;
+      fs.concat(tgt, src);
+    } catch (UnsupportedOperationException use) {
+      throw new UnsupportedOperationException(
+          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() +
+          " is not supported since the target file system doesn't" +
+          " support concat.", use);
+    } catch (Exception e) {
+      // Ignore other exception
+    }
+  }
+
+  /**
+   * Set up needed options for splitting large files.
+   */
+  private void setOptionsForSplitLargeFile() throws IOException {
+    if (!inputOptions.splitLargeFile()) {
+      return;
+    }
+    Path target = inputOptions.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+    checkConcatSupport(targetFS);
+
+    LOG.info("Enabling preserving blocksize since "
+        + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
+    inputOptions.preserve(FileAttribute.BLOCKSIZE);
+
+    LOG.info("Set " +
+        DistCpOptionSwitch.APPEND.getSwitch()
+        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+        + " is passed.");
+    inputOptions.setAppend(false);
+
+    LOG.info("Set " +
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES
+        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+        + " is passed.");
+    getConf().setBoolean(
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+  }
+
+
   /**
    * Create Job object for submitting it, with all the configuration
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index fb47d76..ced9b54 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -169,6 +169,16 @@ public enum DistCpOptionSwitch {
       new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
               "copied to <= n bytes")),
 
+  BLOCKS_PER_CHUNK("",
+      new Option("blocksperchunk", true, "If set to a positive value, files"
+          + "with more blocks than this value will be split into chunks of "
+          + "<blocksperchunk> blocks to be transferred in parallel, and "
+          + "reassembled on the destination. By default, <blocksperchunk> is "
+          + "0 and the files will be transmitted in their entirety without "
+          + "splitting. This switch is only applicable when the source file "
+          + "system implements getBlockLocations method and the target file "
+          + "system implements concat method")),
+
   /**
    * Specify bandwidth per map in MB, accepts bandwidth as a fraction
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 8c37ff3..9822d83 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -97,7 +97,11 @@ public class DistCpOptions {
   // targetPathExist is a derived field, it's initialized in the
   // beginning of distcp.
   private boolean targetPathExists = true;
-  
+
+  // Size of chunk in number of blocks when splitting large file into chunks
+  // to copy in parallel. Default is 0 and file are not splitted.
+  private int blocksPerChunk = 0;
+
   public static enum FileAttribute{
     REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
 
@@ -166,6 +170,7 @@ public class DistCpOptions {
       this.targetPath = that.getTargetPath();
       this.targetPathExists = that.getTargetPathExists();
       this.filtersFile = that.getFiltersFile();
+      this.blocksPerChunk = that.blocksPerChunk;
     }
   }
 
@@ -578,6 +583,18 @@ public class DistCpOptions {
     this.filtersFile = filtersFilename;
   }
 
+  public final void setBlocksPerChunk(int csize) {
+    this.blocksPerChunk = csize;
+  }
+
+  public final int getBlocksPerChunk() {
+    return blocksPerChunk;
+  }
+
+  public final boolean splitLargeFile() {
+    return blocksPerChunk > 0;
+  }
+
   void validate() {
     if ((useDiff || useRdiff) && deleteMissing) {
       // -delete and -diff/-rdiff are mutually exclusive. For backward
@@ -669,6 +686,8 @@ public class DistCpOptions {
       DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
           filtersFile);
     }
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK,
+        String.valueOf(blocksPerChunk));
   }
 
   /**
@@ -704,6 +723,7 @@ public class DistCpOptions {
         ", targetPath=" + targetPath +
         ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
+        ", blocksPerChunk=" + blocksPerChunk +
         '}';
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index d0f82ca..8881264 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -172,11 +172,44 @@ public class OptionsParser {
           DistCpOptionSwitch.FILTERS.getSwitch()));
     }
 
+    parseBlocksPerChunk(command, option);
+
     option.validate();
 
     return option;
   }
 
+
+  /**
+   * A helper method to parse chunk size in number of blocks.
+   * Used when breaking large file into chunks to copy in parallel.
+   *
+   * @param command command line arguments
+   */
+  private static void parseBlocksPerChunk(CommandLine command,
+      DistCpOptions option) {
+    boolean hasOption =
+        command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch());
+    LOG.info("parseChunkSize: " +
+        DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption);
+    if (hasOption) {
+      String chunkSizeString = getVal(command,
+          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
+      try {
+        int csize = Integer.parseInt(chunkSizeString);
+        if (csize < 0) {
+          csize = 0;
+        }
+        LOG.info("Set distcp blocksPerChunk to " + csize);
+        option.setBlocksPerChunk(csize);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("blocksPerChunk is invalid: "
+            + chunkSizeString, e);
+      }
+    }
+  }
+
   /**
    * parseSizeLimit is a helper method for parsing the deprecated
    * argument SIZE_LIMIT.
@@ -211,8 +244,7 @@ public class OptionsParser {
                               DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
       try {
         Integer.parseInt(fileLimitString);
-      }
-      catch (NumberFormatException e) {
+      } catch (NumberFormatException e) {
         throw new IllegalArgumentException("File-limit is invalid: "
                                             + fileLimitString, e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index 105e4f2..af91347 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.tools;
 
 import com.google.common.collect.Lists;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.LinkedList;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -240,10 +242,10 @@ public class SimpleCopyListing extends CopyListing {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
-    CopyListingFileStatus fileCopyListingStatus =
+    LinkedList<CopyListingFileStatus> fileCopyListingStatus =
         DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
-            preserveAcls, preserveXAttrs, preserveRawXAttrs);
-
+            preserveAcls, preserveXAttrs, preserveRawXAttrs,
+            options.getBlocksPerChunk());
     writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
         sourceRoot, options);
   }
@@ -348,9 +350,10 @@ public class SimpleCopyListing extends CopyListing {
         FileStatus[] sourceFiles = sourceFS.listStatus(path);
         boolean explore = (sourceFiles != null && sourceFiles.length > 0);
         if (!explore || rootStatus.isDirectory()) {
-          CopyListingFileStatus rootCopyListingStatus =
-            DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
-                preserveAcls, preserveXAttrs, preserveRawXAttrs);
+          LinkedList<CopyListingFileStatus> rootCopyListingStatus =
+              DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus,
+                  preserveAcls, preserveXAttrs, preserveRawXAttrs,
+                  options.getBlocksPerChunk());
           writeToFileListingRoot(fileListWriter, rootCopyListingStatus,
               sourcePathRoot, options);
         }
@@ -360,20 +363,20 @@ public class SimpleCopyListing extends CopyListing {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
             }
-            CopyListingFileStatus sourceCopyListingStatus =
-              DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
-                  preserveAcls && sourceStatus.isDirectory(),
-                  preserveXAttrs && sourceStatus.isDirectory(),
-                  preserveRawXAttrs && sourceStatus.isDirectory());
-            if (randomizeFileListing) {
-              addToFileListing(statusList,
-                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
-                  fileListWriter);
-            } else {
-              writeToFileListing(fileListWriter, sourceCopyListingStatus,
-                  sourcePathRoot);
+            LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
+                DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
+                    preserveAcls && sourceStatus.isDirectory(),
+                    preserveXAttrs && sourceStatus.isDirectory(),
+                    preserveRawXAttrs && sourceStatus.isDirectory(),
+                    options.getBlocksPerChunk());
+            for (CopyListingFileStatus fs : sourceCopyListingStatus) {
+              if (randomizeFileListing) {
+                addToFileListing(statusList,
+                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+              } else {
+                writeToFileListing(fileListWriter, fs, sourcePathRoot);
+              }
             }
-
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
@@ -641,18 +644,20 @@ public class SimpleCopyListing extends CopyListing {
             LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
           }
           if (workResult.getSuccess()) {
-            CopyListingFileStatus childCopyListingStatus =
+            LinkedList<CopyListingFileStatus> childCopyListingStatus =
               DistCpUtils.toCopyListingFileStatus(sourceFS, child,
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
-                preserveRawXattrs && child.isDirectory());
-            if (randomizeFileListing) {
-              addToFileListing(fileStatuses,
-                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
-                  fileListWriter);
-            } else {
-              writeToFileListing(fileListWriter, childCopyListingStatus,
-                  sourcePathRoot);
+                preserveRawXattrs && child.isDirectory(),
+                options.getBlocksPerChunk());
+
+            for (CopyListingFileStatus fs : childCopyListingStatus) {
+              if (randomizeFileListing) {
+                addToFileListing(fileStatuses,
+                    new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+              } else {
+                writeToFileListing(fileListWriter, fs, sourcePathRoot);
+              }
             }
           }
           if (retry < maxRetries) {
@@ -675,19 +680,21 @@ public class SimpleCopyListing extends CopyListing {
   }
 
   private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
-      CopyListingFileStatus fileStatus, Path sourcePathRoot,
+      LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
       DistCpOptions options) throws IOException {
     boolean syncOrOverwrite = options.shouldSyncFolder() ||
         options.shouldOverwrite();
-    if (fileStatus.getPath().equals(sourcePathRoot) && 
-        fileStatus.isDirectory() && syncOrOverwrite) {
-      // Skip the root-paths when syncOrOverwrite
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip " + fileStatus.getPath());
-      }      
-      return;
+    for (CopyListingFileStatus fs : fileStatus) {
+      if (fs.getPath().equals(sourcePathRoot) &&
+          fs.isDirectory() && syncOrOverwrite) {
+        // Skip the root-paths when syncOrOverwrite
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip " + fs.getPath());
+        }
+        return;
+      }
+      writeToFileListing(fileListWriter, fs, sourcePathRoot);
     }
-    writeToFileListing(fileListWriter, fileStatus, sourcePathRoot);
   }
 
   private void writeToFileListing(SequenceFile.Writer fileListWriter,
@@ -707,7 +714,7 @@ public class SimpleCopyListing extends CopyListing {
     fileListWriter.sync();
 
     if (!fileStatus.isDirectory()) {
-      totalBytesToCopy += fileStatus.getLen();
+      totalBytesToCopy += fileStatus.getSizeToCopy();
     } else {
       totalDirs++;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 75cefb4..6ddaab9 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -34,14 +35,17 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.CopyListing;
 import org.apache.hadoop.tools.CopyListingFileStatus;
 import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptionSwitch;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.GlobbedCopyListing;
 import org.apache.hadoop.tools.util.DistCpUtils;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -63,7 +67,8 @@ public class CopyCommitter extends FileOutputCommitter {
   private boolean syncFolder = false;
   private boolean overwrite = false;
   private boolean targetPathExists = true;
-  
+  private boolean ignoreFailures = false;
+
   /**
    * Create a output committer
    *
@@ -82,8 +87,13 @@ public class CopyCommitter extends FileOutputCommitter {
     Configuration conf = jobContext.getConfiguration();
     syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false);
     overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false);
-    targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
-    
+    targetPathExists = conf.getBoolean(
+        DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true);
+    ignoreFailures = conf.getBoolean(
+        DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
+
+    concatFileChunks(conf);
+
     super.commitJob(jobContext);
 
     cleanupTempFiles(jobContext);
@@ -169,9 +179,112 @@ public class CopyCommitter extends FileOutputCommitter {
     }
   }
 
+  private boolean isFileNotFoundException(IOException e) {
+    if (e instanceof FileNotFoundException) {
+      return true;
+    }
+
+    if (e instanceof RemoteException) {
+      return ((RemoteException)e).unwrapRemoteException()
+          instanceof FileNotFoundException;
+    }
+
+    return false;
+  }
+
+  /**
+   * Concat chunk files for the same file into one.
+   * Iterate through copy listing, identify chunk files for the same file,
+   * concat them into one.
+   */
+  private void concatFileChunks(Configuration conf) throws IOException {
+
+    LOG.info("concat file chunks ...");
+
+    String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH);
+    if (spath == null || spath.isEmpty()) {
+      return;
+    }
+    Path sourceListing = new Path(spath);
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+                                      SequenceFile.Reader.file(sourceListing));
+    Path targetRoot =
+        new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    try {
+      CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+      Text srcRelPath = new Text();
+      CopyListingFileStatus lastFileStatus = null;
+      LinkedList<Path> allChunkPaths = new LinkedList<Path>();
+
+      // Iterate over every source path that was copied.
+      while (sourceReader.next(srcRelPath, srcFileStatus)) {
+        if (srcFileStatus.isDirectory()) {
+          continue;
+        }
+        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+        Path targetFileChunkPath =
+            DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("  add " + targetFileChunkPath + " to concat.");
+        }
+        allChunkPaths.add(targetFileChunkPath);
+        if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength()
+            == srcFileStatus.getLen()) {
+          // This is the last chunk of the splits, consolidate allChunkPaths
+          try {
+            concatFileChunks(conf, targetFile, allChunkPaths);
+          } catch (IOException e) {
+            // If the concat failed because a chunk file doesn't exist,
+            // then we assume that the CopyMapper has skipped copying this
+            // file, and we ignore the exception here.
+            // If a chunk file should have been created but it was not, then
+            // the CopyMapper would have failed.
+            if (!isFileNotFoundException(e)) {
+              String emsg = "Failed to concat chunk files for " + targetFile;
+              if (!ignoreFailures) {
+                throw new IOException(emsg, e);
+              } else {
+                LOG.warn(emsg, e);
+              }
+            }
+          }
+          allChunkPaths.clear();
+          lastFileStatus = null;
+        } else {
+          if (lastFileStatus == null) {
+            lastFileStatus = new CopyListingFileStatus(srcFileStatus);
+          } else {
+            // Two neighboring chunks have to be consecutive ones for the same
+            // file, for them to be merged
+            if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) ||
+                srcFileStatus.getChunkOffset() !=
+                (lastFileStatus.getChunkOffset() +
+                lastFileStatus.getChunkLength())) {
+              String emsg = "Inconsistent sequence file: current " +
+                  "chunk file " + srcFileStatus + " doesnt match prior " +
+                  "entry " + lastFileStatus;
+              if (!ignoreFailures) {
+                throw new IOException(emsg);
+              } else {
+                LOG.warn(emsg + ", skipping concat this set.");
+              }
+            } else {
+              lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset());
+              lastFileStatus.setChunkLength(srcFileStatus.getChunkLength());
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+    }
+  }
+
   // This method changes the target-directories' file-attributes (owner,
   // user/group permissions, etc.) based on the corresponding source directories.
-  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+  private void preserveFileAttributesForDirectories(Configuration conf)
+      throws IOException {
     String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
     final boolean syncOrOverwrite = syncFolder || overwrite;
 
@@ -325,4 +438,57 @@ public class CopyCommitter extends FileOutputCommitter {
         ", Unable to move to " + finalDir);
     }
   }
+
+  /**
+   * Concat the passed chunk files into one and rename it the targetFile.
+   */
+  private void concatFileChunks(Configuration conf, Path targetFile,
+      LinkedList<Path> allChunkPaths) throws IOException {
+    if (allChunkPaths.size() == 1) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat " + targetFile + " allChunkSize+ "
+          + allChunkPaths.size());
+    }
+    FileSystem dstfs = targetFile.getFileSystem(conf);
+
+    Path firstChunkFile = allChunkPaths.removeFirst();
+    Path[] restChunkFiles = new Path[allChunkPaths.size()];
+    allChunkPaths.toArray(restChunkFiles);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile));
+      int i = 0;
+      for (Path f : restChunkFiles) {
+        LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f));
+        ++i;
+      }
+    }
+    dstfs.concat(firstChunkFile, restChunkFiles);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
+    }
+    rename(dstfs, firstChunkFile, targetFile);
+  }
+
+  /**
+   * Rename tmp to dst on destFileSys.
+   * @param destFileSys the file ssystem
+   * @param tmp the source path
+   * @param dst the destination path
+   * @throws IOException if renaming failed
+   */
+  private static void rename(FileSystem destFileSys, Path tmp, Path dst)
+      throws IOException {
+    try {
+      if (destFileSys.exists(dst)) {
+        destFileSys.delete(dst, true);
+      }
+      destFileSys.rename(tmp, dst);
+    } catch (IOException ioe) {
+      throw new IOException("Fail to rename tmp file (=" + tmp
+          + ") to destination file (=" + dst + ")", ioe);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
index e1873f1..d6b3ba8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
@@ -156,10 +156,12 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         sourceFS = sourcePath.getFileSystem(conf);
         final boolean preserveXAttrs =
             fileAttributes.contains(FileAttribute.XATTR);
-        sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
-          sourceFS.getFileStatus(sourcePath),
-          fileAttributes.contains(FileAttribute.ACL), 
-          preserveXAttrs, preserveRawXattrs);
+        sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS,
+            sourceFS.getFileStatus(sourcePath),
+            fileAttributes.contains(FileAttribute.ACL),
+            preserveXAttrs, preserveRawXattrs,
+            sourceFileStatus.getChunkOffset(),
+            sourceFileStatus.getChunkLength());
       } catch (FileNotFoundException e) {
         throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
       }
@@ -173,7 +175,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
           LOG.debug("Path could not be found: " + target, ignore);
       }
 
-      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
+      if (targetStatus != null &&
+          (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
         throw new IOException("Can't replace " + target + ". Target is " +
             getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
       }
@@ -183,19 +186,28 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
         return;
       }
 
-      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus);
+      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target,
+          targetStatus);
+
+      Path tmpTarget = target;
       if (action == FileAction.SKIP) {
         LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                  + " to " + target);
         updateSkipCounters(context, sourceCurrStatus);
         context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
+
       } else {
-        copyFileWithRetry(description, sourceCurrStatus, target, context,
+        if (sourceCurrStatus.isSplit()) {
+          tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget);
+        }
+        copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context,
             action, fileAttributes);
       }
-
-      DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
-          fileAttributes, preserveRawXattrs);
+      DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
+          sourceCurrStatus, fileAttributes, preserveRawXattrs);
     } catch (IOException exception) {
       handleFailures(exception, sourceFileStatus, target, context);
     }
@@ -261,8 +273,12 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private void handleFailures(IOException exception,
       CopyListingFileStatus sourceFileStatus, Path target, Context context)
       throws IOException, InterruptedException {
-    LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
-                target, exception);
+    LOG.error("Failure in copying " + sourceFileStatus.getPath() +
+        (sourceFileStatus.isSplit()? ","
+            + " offset=" + sourceFileStatus.getChunkOffset()
+            + " chunkLength=" + sourceFileStatus.getChunkLength()
+            : "") +
+        " to " + target, exception);
 
     if (ignoreFailures &&
         ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
index 06acd78..2c17fef 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
@@ -118,17 +118,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
           .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
           .getFileChecksum(sourcePath) : null;
 
-      final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
-          target).getLen() : 0;
+      long offset = (action == FileAction.APPEND) ?
+          targetFS.getFileStatus(target).getLen() : source.getChunkOffset();
       long bytesRead = copyToFile(targetPath, targetFS, source,
           offset, context, fileAttributes, sourceChecksum);
 
-      compareFileLengths(source, targetPath, configuration, bytesRead
-          + offset);
+      if (!source.isSplit()) {
+        compareFileLengths(source, targetPath, configuration, bytesRead
+            + offset);
+      }
       //At this point, src&dest lengths are same. if length==0, we skip checksum
       if ((bytesRead != 0) && (!skipCrc)) {
-        compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
-            targetFS, targetPath);
+        if (!source.isSplit()) {
+          compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
+              targetFS, targetPath);
+        }
       }
       // it's not append case, thus we first write to a temporary file, rename
       // it to the target path.
@@ -249,16 +253,26 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     ThrottledInputStream inStream = null;
     long totalBytesRead = 0;
 
+    long chunkLength = source2.getChunkLength();
+    boolean finished = false;
     try {
       inStream = getInputStream(source, context.getConfiguration());
       int bytesRead = readBytes(inStream, buf, sourceOffset);
       while (bytesRead >= 0) {
+        if (chunkLength > 0 &&
+            (totalBytesRead + bytesRead) >= chunkLength) {
+          bytesRead = (int)(chunkLength - totalBytesRead);
+          finished = true;
+        }
         totalBytesRead += bytesRead;
         if (action == FileAction.APPEND) {
           sourceOffset += bytesRead;
         }
         outStream.write(buf, 0, bytesRead);
         updateContextStatus(totalBytesRead, context, source2);
+        if (finished) {
+          break;
+        }
         bytesRead = readBytes(inStream, buf, sourceOffset);
       }
       outStream.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
index 3e86d09..d1c18ea 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java
@@ -99,7 +99,8 @@ public class UniformSizeInputFormat
       while (reader.next(srcRelPath, srcFileStatus)) {
         // If adding the current file would cause the bytes per map to exceed
         // limit. Add the current file to new split
-        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
+        if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit
+            && lastPosition != 0) {
           FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
               lastPosition - lastSplitStart, null);
           if (LOG.isDebugEnabled()) {
@@ -109,7 +110,7 @@ public class UniformSizeInputFormat
           lastSplitStart = lastPosition;
           currentSplitSize = 0;
         }
-        currentSplitSize += srcFileStatus.getLen();
+        currentSplitSize += srcFileStatus.getChunkLength();
         lastPosition = reader.getPosition();
       }
       if (lastPosition > lastSplitStart) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index 76bc4c5..e315b84 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.tools.util;
 
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +32,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclUtil;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -44,6 +47,7 @@ import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -116,7 +120,7 @@ public class DistCpUtils {
    * @return Class implementing the strategy specified in options.
    */
   public static Class<? extends InputFormat> getStrategy(Configuration conf,
-                                                                 DistCpOptions options) {
+      DistCpOptions options) {
     String confLabel = "distcp."
         + StringUtils.toLowerCase(options.getCopyStrategy())
         + ".strategy" + ".impl";
@@ -298,6 +302,86 @@ public class DistCpUtils {
   }
 
   /**
+   * Converts FileStatus to a list of CopyListingFileStatus.
+   * The resulted list contains either one CopyListingFileStatus per chunk of
+   * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there
+   * are more blocks in the file than blocksperChunk), or a single
+   * CopyListingFileStatus for the entire file (if file-size is too small to
+   * split).
+   * If preserving ACLs, populates the CopyListingFileStatus with the ACLs.
+   * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs.
+   *
+   * @param fileSystem FileSystem containing the file
+   * @param fileStatus FileStatus of file
+   * @param preserveAcls boolean true if preserving ACLs
+   * @param preserveXAttrs boolean true if preserving XAttrs
+   * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
+   * @param blocksPerChunk size of chunks when copying chunks in parallel
+   * @return list of CopyListingFileStatus
+   * @throws IOException if there is an I/O error
+   */
+  public static LinkedList<CopyListingFileStatus> toCopyListingFileStatus(
+      FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls,
+      boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk)
+          throws IOException {
+    LinkedList<CopyListingFileStatus> copyListingFileStatus =
+        new LinkedList<CopyListingFileStatus>();
+
+    final CopyListingFileStatus clfs = toCopyListingFileStatusHelper(
+        fileSystem, fileStatus, preserveAcls,
+        preserveXAttrs, preserveRawXAttrs,
+        0, fileStatus.getLen());
+    final long blockSize = fileStatus.getBlockSize();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("toCopyListing: " + fileStatus + " chunkSize: "
+          + blocksPerChunk + " isDFS: " +
+          (fileSystem instanceof DistributedFileSystem));
+    }
+    if ((blocksPerChunk > 0) &&
+        !fileStatus.isDirectory() &&
+        (fileStatus.getLen() > blockSize * blocksPerChunk)) {
+      // split only when the file size is larger than the intended chunk size
+      final BlockLocation[] blockLocations;
+      blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0,
+            fileStatus.getLen());
+
+      int numBlocks = blockLocations.length;
+      long curPos = 0;
+      if (numBlocks <= blocksPerChunk) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("  add file " + clfs);
+        }
+        copyListingFileStatus.add(clfs);
+      } else {
+        int i = 0;
+        while (i < numBlocks) {
+          long curLength = 0;
+          for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) {
+            curLength += blockLocations[i].getLength();
+          }
+          if (curLength > 0) {
+            CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs);
+            clfs1.setChunkOffset(curPos);
+            clfs1.setChunkLength(curLength);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("  add file chunk " + clfs1);
+            }
+            copyListingFileStatus.add(clfs1);
+            curPos += curLength;
+          }
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("  add file/dir " + clfs);
+      }
+      copyListingFileStatus.add(clfs);
+    }
+
+    return copyListingFileStatus;
+  }
+
+  /**
    * Converts a FileStatus to a CopyListingFileStatus.  If preserving ACLs,
    * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs,
    * populates the CopyListingFileStatus with the XAttrs.
@@ -307,13 +391,17 @@ public class DistCpUtils {
    * @param preserveAcls boolean true if preserving ACLs
    * @param preserveXAttrs boolean true if preserving XAttrs
    * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs
+   * @param chunkOffset chunk offset in bytes
+   * @param chunkLength chunk length in bytes
+   * @return CopyListingFileStatus
    * @throws IOException if there is an I/O error
    */
-  public static CopyListingFileStatus toCopyListingFileStatus(
+  public static CopyListingFileStatus toCopyListingFileStatusHelper(
       FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, 
-      boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException {
+      boolean preserveXAttrs, boolean preserveRawXAttrs,
+      long chunkOffset, long chunkLength) throws IOException {
     CopyListingFileStatus copyListingFileStatus =
-      new CopyListingFileStatus(fileStatus);
+        new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength);
     if (preserveAcls) {
       FsPermission perm = fileStatus.getPermission();
       if (perm.getAclBit()) {
@@ -470,4 +558,19 @@ public class DistCpUtils {
     return (sourceChecksum == null || targetChecksum == null ||
             sourceChecksum.equals(targetChecksum));
   }
+
+  /*
+   * Return the Path for a given chunk.
+   * Used when splitting large file into chunks to copy in parallel.
+   * @param targetFile path to target file
+   * @param srcFileStatus source file status in copy listing
+   * @return path to the chunk specified by the parameters to store
+   * in target cluster temporarily
+   */
+  public static Path getSplitChunkPath(Path targetFile,
+      CopyListingFileStatus srcFileStatus) {
+    return new Path(targetFile.toString()
+        + ".____distcpSplit____" + srcFileStatus.getChunkOffset()
+        + "." + srcFileStatus.getChunkLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
index 41a6e94..a77deb2 100644
--- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
+++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
@@ -237,6 +237,7 @@ Flag              | Description                          | Notes
 `-rdiff <newSnapshot> <oldSnapshot>` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `<oldSnapshot>` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's `<oldSnapshot>`, to make the target the same as `<oldSnapshot>`. | This option is valid only with `-update` option and the following conditions should be satisfied. <ol><li>Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `<oldSnapshot>` to target's current state).</li>  <li> Two snapshots `<newSnapshot>` and `<oldSnapshot>` have been created on the target FS, and `<oldSnapshot>` is older than `<newSnapshot>`. No change has been made on target since `<newSnapshot>` was created on the target. </li> <li> The sour
 ce has the same snapshot `<oldSnapshot>`, which has the same content as the `<oldSnapshot>` on the target. All the files/directories in the target's `<oldSnapshot>` are the same with source's `<oldSnapshot>`.</li> </ol> |
 `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
 `-skipcrccheck` | Whether to skip CRC checks between source and target paths. |
+`-blocksperchunk <blocksperchunk>` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `<blocksperchunk>` blocks to be transferred in parallel, and reassembled on the destination. By default, `<blocksperchunk>` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. |
 
 Architecture of DistCp
 ----------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
index e3018a0..b2266b3 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java
@@ -23,17 +23,27 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.PrintStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -47,11 +57,15 @@ import org.junit.rules.Timeout;
  */
 
 public class TestDistCpSystem {
+  private static final Log LOG =
+      LogFactory.getLog(TestDistCpSystem.class);
+
   @Rule
   public Timeout globalTimeout = new Timeout(30000);
 
   private static final String SRCDAT = "srcdat";
   private static final String DSTDAT = "dstdat";
+  private static final long BLOCK_SIZE = 1024;
 
   private static MiniDFSCluster cluster;
   private static Configuration conf;
@@ -63,27 +77,76 @@ public class TestDistCpSystem {
       this.path = path;
       this.isDir = isDir;
     }
-    String getPath() { return path; }
-    boolean isDirectory() { return isDir; }
+
+    String getPath() {
+      return path;
+    }
+
+    boolean isDirectory() {
+      return isDir;
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  static String execCmd(FsShell shell, String... args) throws Exception {
+    ByteArrayOutputStream baout = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baout, true);
+    PrintStream old = System.out;
+    System.setOut(out);
+    shell.run(args);
+    out.close();
+    System.setOut(old);
+    return baout.toString();
   }
   
-  private void createFiles(FileSystem fs, String topdir,
-      FileEntry[] entries) throws IOException {
+  private void createFiles(DistributedFileSystem fs, String topdir,
+      FileEntry[] entries, long chunkSize) throws IOException {
+    long seed = System.currentTimeMillis();
+    Random rand = new Random(seed);
+    short replicationFactor = 2;
     for (FileEntry entry : entries) {
-      Path newpath = new Path(topdir + "/" + entry.getPath());
+      Path newPath = new Path(topdir + "/" + entry.getPath());
       if (entry.isDirectory()) {
-        fs.mkdirs(newpath);
+        fs.mkdirs(newPath);
       } else {
-        OutputStream out = fs.create(newpath);
-        try {
-          out.write((topdir + "/" + entry).getBytes());
-          out.write("\n".getBytes());
-        } finally {
-          out.close();
+        long fileSize = BLOCK_SIZE *100;
+        int bufSize = 128;
+        if (chunkSize == -1) {
+          DFSTestUtil.createFile(fs, newPath, bufSize,
+              fileSize, BLOCK_SIZE, replicationFactor, seed);
+        } else {
+          // Create a variable length block file, by creating
+          // one block of half block size at the chunk boundary
+          long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2;
+          long seg2 = fileSize - seg1;
+          DFSTestUtil.createFile(fs, newPath, bufSize,
+              seg1, BLOCK_SIZE, replicationFactor, seed);
+          DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2);
         }
       }
+      seed = System.currentTimeMillis() + rand.nextLong();
     }
   }
+
+  private void createFiles(DistributedFileSystem fs, String topdir,
+      FileEntry[] entries) throws IOException {
+    createFiles(fs, topdir, entries, -1);
+  }
    
   private static FileStatus[] getFileStatus(FileSystem fs,
       String topdir, FileEntry[] files) throws IOException {
@@ -104,18 +167,19 @@ public class TestDistCpSystem {
   }
 
   private void testPreserveUserHelper(String testRoot,
-                                      FileEntry[] srcEntries,
-                                      FileEntry[] dstEntries,
-                                      boolean createSrcDir,
-                                      boolean createTgtDir,
-                                      boolean update) throws Exception {
+      FileEntry[] srcEntries,
+      FileEntry[] dstEntries,
+      boolean createSrcDir,
+      boolean createTgtDir,
+      boolean update) throws Exception {
     final String testSrcRel = SRCDAT;
     final String testSrc = testRoot + "/" + testSrcRel;
     final String testDstRel = DSTDAT;
     final String testDst = testRoot + "/" + testDstRel;
 
     String nnUri = FileSystem.getDefaultUri(conf).toString();
-    FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+    DistributedFileSystem fs = (DistributedFileSystem)
+        FileSystem.get(URI.create(nnUri), conf);
     fs.mkdirs(new Path(testRoot));
     if (createSrcDir) {
       fs.mkdirs(new Path(testSrc));
@@ -129,8 +193,8 @@ public class TestDistCpSystem {
     for(int i = 0; i < srcEntries.length; i++) {
       fs.setOwner(srcstats[i].getPath(), "u" + i, null);
     }
-    String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc,
-        nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst};
+    String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc,
+        nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst};
 
     ToolRunner.run(conf, new DistCp(), args);
 
@@ -145,20 +209,263 @@ public class TestDistCpSystem {
     deldir(fs, testRoot);
   }
 
-  @BeforeClass
-  public static void beforeClass() throws IOException {
-    conf = new Configuration();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    cluster.waitActive();
+  private void compareFiles(FileSystem fs, FileStatus srcStat,
+      FileStatus dstStat) throws Exception {
+    LOG.info("Comparing " + srcStat + " and " + dstStat);
+    assertEquals(srcStat.isDirectory(), dstStat.isDirectory());
+    assertEquals(srcStat.getReplication(), dstStat.getReplication());
+    assertEquals("File POSIX permission should match",
+        srcStat.getPermission(), dstStat.getPermission());
+    assertEquals("File user ownership should match",
+        srcStat.getOwner(), dstStat.getOwner());
+    assertEquals("File group ownership should match",
+        srcStat.getGroup(), dstStat.getGroup());
+    // TODO; check ACL attributes
+
+    if (srcStat.isDirectory()) {
+      return;
+    }
+
+    assertEquals("File length should match (" + srcStat.getPath() + ")",
+        srcStat.getLen(), dstStat.getLen());
+
+    FSDataInputStream srcIn = fs.open(srcStat.getPath());
+    FSDataInputStream dstIn = fs.open(dstStat.getPath());
+    try {
+      byte[] readSrc = new byte[(int)
+                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
+      byte[] readDst = new byte[(int)
+                                HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT];
+
+      int srcBytesRead = 0, tgtBytesRead = 0;
+      int srcIdx = 0, tgtIdx = 0;
+      long totalComparedBytes = 0;
+      while (true) {
+        if (srcBytesRead == 0) {
+          srcBytesRead = srcIn.read(readSrc);
+          srcIdx = 0;
+        }
+        if (tgtBytesRead == 0) {
+          tgtBytesRead = dstIn.read(readDst);
+          tgtIdx = 0;
+        }
+        if (srcBytesRead == 0 || tgtBytesRead == 0) {
+          LOG.info("______ compared src and dst files for "
+              + totalComparedBytes + " bytes, content match.");
+          if (srcBytesRead != tgtBytesRead) {
+            Assert.fail("Read mismatching size, compared "
+                + totalComparedBytes + " bytes between src and dst file "
+                + srcStat + " and " + dstStat);
+          }
+          if (totalComparedBytes != srcStat.getLen()) {
+            Assert.fail("Only read/compared " + totalComparedBytes +
+                " bytes between src and dst file " + srcStat +
+                " and " + dstStat);
+          } else {
+            // success
+            break;
+          }
+        }
+        for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead;
+            ++srcIdx, ++tgtIdx) {
+          if (readSrc[srcIdx] != readDst[tgtIdx]) {
+            Assert.fail("src and dst file does not match at "
+                + totalComparedBytes + " between "
+                + srcStat + " and " + dstStat);
+          }
+          ++totalComparedBytes;
+        }
+        LOG.info("______ compared src and dst files for "
+            + totalComparedBytes + " bytes, content match. FileLength: "
+            + srcStat.getLen());
+        if (totalComparedBytes == srcStat.getLen()) {
+          LOG.info("______ Final:" + srcIdx + " "
+              + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead);
+          break;
+        }
+        if (srcIdx == srcBytesRead) {
+          srcBytesRead = 0;
+        }
+        if (tgtIdx == tgtBytesRead) {
+          tgtBytesRead = 0;
+        }
+      }
+    } finally {
+      if (srcIn != null) {
+        srcIn.close();
+      }
+      if (dstIn != null) {
+        dstIn.close();
+      }
+    }
   }
 
-  @AfterClass
-  public static void afterClass() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
+  // WC: needed because the current distcp does not create target dirs
+  private void createDestDir(FileSystem fs, String testDst,
+      FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException {
+    fs.mkdirs(new Path(testDst));
+
+    for (int i=0; i<srcStats.length; i++) {
+      FileStatus srcStat = srcStats[i];
+      if (srcStat.isDirectory()) {
+        Path dstPath = new Path(testDst, srcFiles[i].getPath());
+        fs.mkdirs(dstPath);
+        fs.setOwner(dstPath, srcStat.getOwner(), srcStat.getGroup());
+      }
+    }
+  }
+
+  private void copyAndVerify(final DistributedFileSystem fs,
+      final FileEntry[] srcFiles, final FileStatus[] srcStats,
+      final String testDst,
+      final String[] args) throws Exception {
+    final String testRoot = "/testdir";
+    FsShell shell = new FsShell(fs.getConf());
+
+    LOG.info("ls before distcp");
+    LOG.info(execCmd(shell, "-lsr", testRoot));
+
+    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
+    ToolRunner.run(conf, new DistCp(), args);
+
+    LOG.info("ls after distcp");
+    LOG.info(execCmd(shell, "-lsr", testRoot));
+
+    FileStatus[] dstStat = getFileStatus(fs, testDst, srcFiles);
+    for (int i=0; i< dstStat.length; i++) {
+      compareFiles(fs, srcStats[i], dstStat[i]);
     }
   }
 
+  private void chunkCopy(FileEntry[] srcFiles) throws Exception {
+    final String testRoot = "/testdir";
+    final String testSrcRel = SRCDAT;
+    final String testSrc = testRoot + "/" + testSrcRel;
+    final String testDstRel = DSTDAT;
+    final String testDst = testRoot + "/" + testDstRel;
+    long chunkSize = 8;
+
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    DistributedFileSystem fs = (DistributedFileSystem)
+        FileSystem.get(URI.create(nnUri), conf);
+
+    createFiles(fs, testRoot, srcFiles, chunkSize);
+
+    FileStatus[] srcStats = getFileStatus(fs, testRoot, srcFiles);
+    for (int i = 0; i < srcFiles.length; i++) {
+      fs.setOwner(srcStats[i].getPath(), "u" + i,  "g" + i);
+    }
+    // get file status after updating owners
+    srcStats = getFileStatus(fs, testRoot, srcFiles);
+
+    createDestDir(fs, testDst, srcStats, srcFiles);
+
+    String[] args = new String[] {"-pugp", "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc, nnUri + testDst};
+
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+    // Do it again
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+
+    // modify last file and rerun distcp with -update option
+    LOG.info("Modify a file and copy again");
+    for(int i=srcFiles.length-1; i >=0; --i) {
+      if (!srcFiles[i].isDirectory()) {
+        LOG.info("Modifying " + srcStats[i].getPath());
+        DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(),
+            (int)BLOCK_SIZE * 3);
+        break;
+      }
+    }
+    // get file status after modifying file
+    srcStats = getFileStatus(fs, testRoot, srcFiles);
+
+    args = new String[] {"-pugp", "-update", "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc, nnUri + testDst + "/" + testSrcRel};
+
+    copyAndVerify(fs, srcFiles, srcStats, testDst, args);
+
+    deldir(fs, testRoot);
+  }
+
+  @Test
+  public void testRecursiveChunkCopy() throws Exception {
+    FileEntry[] srcFiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file0", false),
+        new FileEntry(SRCDAT + "/dir1", true),
+        new FileEntry(SRCDAT + "/dir2", true),
+        new FileEntry(SRCDAT + "/dir1/file1", false)
+    };
+    chunkCopy(srcFiles);
+  }
+
+  @Test
+  public void testChunkCopyOneFile() throws Exception {
+    FileEntry[] srcFiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file0", false)
+    };
+    chunkCopy(srcFiles);
+  }
+
+  @Test
+  public void testDistcpLargeFile() throws Exception {
+    FileEntry[] srcfiles = {
+        new FileEntry(SRCDAT, true),
+        new FileEntry(SRCDAT + "/file", false)
+    };
+
+    final String testRoot = "/testdir";
+    final String testSrcRel = SRCDAT;
+    final String testSrc = testRoot + "/" + testSrcRel;
+    final String testDstRel = DSTDAT;
+    final String testDst = testRoot + "/" + testDstRel;
+
+    String nnUri = FileSystem.getDefaultUri(conf).toString();
+    DistributedFileSystem fs =
+        (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
+    fs.mkdirs(new Path(testRoot));
+    fs.mkdirs(new Path(testSrc));
+    fs.mkdirs(new Path(testDst));
+    long chunkSize = 6;
+    createFiles(fs, testRoot, srcfiles, chunkSize);
+
+    String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath();
+    Path srcfile = new Path(srcFileName);
+
+    if(!cluster.getFileSystem().exists(srcfile)){
+      throw new Exception("src not exist");
+    }
+
+    final long srcLen = fs.getFileStatus(srcfile).getLen();
+
+    FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles);
+    for (int i = 0; i < srcfiles.length; i++) {
+      fs.setOwner(srcstats[i].getPath(), "u" + i, null);
+    }
+    String[] args = new String[] {
+        "-blocksperchunk",
+        String.valueOf(chunkSize),
+        nnUri + testSrc,
+        nnUri + testDst
+    };
+
+    LOG.info("_____ running distcp: " + args[0] + " " + args[1]);
+    ToolRunner.run(conf, new DistCp(), args);
+
+    String realTgtPath = testDst;
+    FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles);
+    assertEquals("File length should match", srcLen,
+        dststat[dststat.length - 1].getLen());
+
+    this.compareFiles(fs,  srcstats[srcstats.length-1],
+        dststat[dststat.length-1]);
+    deldir(fs, testRoot);
+  }
+
   @Test
   public void testPreserveUseNonEmptyDir() throws Exception {
     String testRoot = "/testdir." + getMethodName();
@@ -180,7 +487,6 @@ public class TestDistCpSystem {
     testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false);
   }
 
-
   @Test
   public void testPreserveUserEmptyDir() throws Exception {
     String testRoot = "/testdir." + getMethodName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
index efe4627..f94ba97 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
@@ -394,7 +394,7 @@ public class TestOptionsParser {
         + "copyStrategy='uniformsize', preserveStatus=[], "
         + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
         + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
-        + "targetPathExists=true, filtersFile='null'}";
+        + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}";
     String optionString = option.toString();
     Assert.assertEquals(val, optionString);
     Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20e57a9a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index 2e9a350..2452d6f 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -81,6 +81,10 @@ public class TestCopyCommitter {
   @Before
   public void createMetaFolder() {
     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
+    // Unset listing file path since the config is shared by
+    // multiple tests, and some test doesn't set it, such as
+    // testNoCommitAction, but the distcp code will check it.
+    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
     Path meta = new Path("/meta");
     try {
       cluster.getFileSystem().mkdirs(meta);
@@ -326,7 +330,6 @@ public class TestCopyCommitter {
       committer.commitJob(jobContext);
       Assert.assertFalse(fs.exists(new Path(workPath)));
       Assert.assertTrue(fs.exists(new Path(finalPath)));
-
     } catch (IOException e) {
       LOG.error("Exception encountered while testing for preserve status", e);
       Assert.fail("Atomic commit failure");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message