hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1495297 [44/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Date Fri, 21 Jun 2013 06:37:39 GMT
Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/OptionsParser.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+
+import java.util.*;
+
+/**
+ * The OptionsParser parses out the command-line options passed to DistCp,
+ * and interprets those specific to DistCp, to create an Options object.
+ */
+public class OptionsParser {
+
+  private static final Log LOG = LogFactory.getLog(OptionsParser.class);
+
+  private static final Options cliOptions = new Options();      
+
+  static {
+    for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding option " + option.getOption());
+      }
+      cliOptions.addOption(option.getOption());
+    }
+  }
+
+  private static class CustomParser extends GnuParser {
+    @Override
+    protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
+      for (int index = 0; index < arguments.length; index++) {
+        if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+          arguments[index] = "-prbugp";
+        }
+      }
+      return super.flatten(options, arguments, stopAtNonOption);
+    }
+  }
+
+  /**
+   * The parse method parses the command-line options, and creates
+   * a corresponding Options object.
+   * @param args Command-line arguments (excluding the options consumed
+   *              by the GenericOptionsParser).
+   * @return The Options object, corresponding to the specified command-line.
+   * @throws IllegalArgumentException: Thrown if the parse fails.
+   */
+  public static DistCpOptions parse(String args[]) throws IllegalArgumentException {
+
+    CommandLineParser parser = new CustomParser();
+
+    CommandLine command;
+    try {
+      command = parser.parse(cliOptions, args, true);
+    } catch (ParseException e) {
+      throw new IllegalArgumentException("Unable to parse arguments. " +
+        Arrays.toString(args), e);
+    }
+
+    DistCpOptions option;
+    Path targetPath;
+    List<Path> sourcePaths = new ArrayList<Path>();
+
+    String leftOverArgs[] = command.getArgs();
+    if (leftOverArgs == null || leftOverArgs.length < 1) {
+      throw new IllegalArgumentException("Target path not specified");
+    }
+
+    //Last Argument is the target path
+    targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
+
+    //Copy any source paths in the arguments to the list
+    for (int index = 0; index < leftOverArgs.length - 1; index++) {
+      sourcePaths.add(new Path(leftOverArgs[index].trim()));
+    }
+
+    /* If command has source file listing, use it else, fall back on source paths in args
+       If both are present, throw exception and bail */
+    if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
+      if (!sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Both source file listing and source paths present");
+      }
+      option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
+              SOURCE_FILE_LISTING.getSwitch())), targetPath);
+    } else {
+      if (sourcePaths.isEmpty()) {
+        throw new IllegalArgumentException("Neither source file listing nor source paths present");
+      }
+      option = new DistCpOptions(sourcePaths, targetPath);
+    }
+
+    //Process all the other option switches and set options appropriately
+    if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
+      option.setIgnoreFailures(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch())) {
+      option.setAtomicCommit(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
+        option.shouldAtomicCommit()) {
+      String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
+      if (workPath != null && !workPath.isEmpty()) {
+        option.setAtomicWorkPath(new Path(workPath));
+      }
+    } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+      throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");      
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+      option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch())) {
+      option.setSyncFolder(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch())) {
+      option.setOverwrite(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
+      option.setDeleteMissing(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch())) {
+      option.setSkipCRC(true);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
+      option.setBlocking(false);
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+      try {
+        Integer mapBandwidth = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
+        if (mapBandwidth.intValue() <= 0) {
+          throw new IllegalArgumentException("Bandwidth specified is not positive: " +
+              mapBandwidth);
+        }
+        option.setMapBandwidth(mapBandwidth);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
+      option.setSslConfigurationFile(command.
+          getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
+      try {
+        Integer maps = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
+        option.setMaxMaps(maps);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Number of maps is invalid: " +
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+      option.setCopyStrategy(
+            getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+      String attributes =
+          getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
+      if (attributes == null || attributes.isEmpty()) {
+        for (FileAttribute attribute : FileAttribute.values()) {
+          option.preserve(attribute);
+        }
+      } else {
+        for (int index = 0; index < attributes.length(); index++) {
+          option.preserve(FileAttribute.
+              getAttribute(attributes.charAt(index)));
+        }
+      }
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+      String fileLimitString = getVal(command,
+                              DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+      try {
+        Integer.parseInt(fileLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("File-limit is invalid: "
+                                            + fileLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+      String sizeLimitString = getVal(command,
+                              DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
+      try {
+        Long.parseLong(sizeLimitString);
+      }
+      catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Size-limit is invalid: "
+                                            + sizeLimitString, e);
+      }
+      LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+              " option. Ignoring.");
+    }
+
+    return option;
+  }
+
+  private static String getVal(CommandLine command, String swtch) {
+    String optionValue = command.getOptionValue(swtch);
+    if (optionValue == null) {
+      return null;
+    } else {
+      return optionValue.trim();
+    }
+  }
+
+  public static void usage() {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("distcp OPTIONS [source_path...] <target_path>\n\nOPTIONS", cliOptions);
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/SimpleCopyListing.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.*;
+import java.util.Stack;
+
+/**
+ * The SimpleCopyListing is responsible for making the exhaustive list of
+ * all files/directories under its specified list of input-paths.
+ * These are written into the specified copy-listing file.
+ * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
+ */
+public class SimpleCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
+
+  private long totalPaths = 0;
+  private long totalBytesToCopy = 0;
+
+  /**
+   * Protected constructor, to initialize configuration.
+   *
+   * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+  }
+
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+
+    Path targetPath = options.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(getConf());
+    boolean targetIsFile = targetFS.isFile(targetPath);
+
+    //If target is a file, then source has to be single file
+    if (targetIsFile) {
+      if (options.getSourcePaths().size() > 1) {
+        throw new InvalidInputException("Multiple source being copied to a file: " +
+            targetPath);
+      }
+
+      Path srcPath = options.getSourcePaths().get(0);
+      FileSystem sourceFS = srcPath.getFileSystem(getConf());
+      if (!sourceFS.isFile(srcPath)) {
+        throw new InvalidInputException("Cannot copy " + srcPath +
+            ", which is not a file to " + targetPath);
+      }
+    }
+
+    if (options.shouldAtomicCommit() && targetFS.exists(targetPath)) {
+      throw new InvalidInputException("Target path for atomic-commit already exists: " +
+        targetPath + ". Cannot atomic-commit to pre-existing target-path.");
+    }
+
+    for (Path path: options.getSourcePaths()) {
+      FileSystem fs = path.getFileSystem(getConf());
+      if (!fs.exists(path)) {
+        throw new InvalidInputException(path + " doesn't exist");
+      }
+    }
+
+    /* This is requires to allow map tasks to access each of the source
+       clusters. This would retrieve the delegation token for each unique
+       file system and add them to job's private credential store
+     */
+    Credentials credentials = getCredentials();
+    if (credentials != null) {
+      Path[] inputPaths = options.getSourcePaths().toArray(new Path[1]);
+      TokenCache.obtainTokensForNamenodes(credentials, inputPaths, getConf());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
+
+    SequenceFile.Writer fileListWriter = null;
+
+    try {
+      fileListWriter = getWriter(pathToListingFile);
+
+      for (Path path: options.getSourcePaths()) {
+        FileSystem sourceFS = path.getFileSystem(getConf());
+        path = makeQualified(path);
+
+        FileStatus rootStatus = sourceFS.getFileStatus(path);
+        Path sourcePathRoot = computeSourceRootPath(rootStatus, options);
+        boolean localFile = (rootStatus.getClass() != FileStatus.class);
+
+        FileStatus[] sourceFiles = sourceFS.listStatus(path);
+        if (sourceFiles != null && sourceFiles.length > 0) {
+          for (FileStatus sourceStatus: sourceFiles) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
+            }
+            writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
+                localFile, options);
+
+            if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath());
+              }
+              traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot,
+                  localFile, options);
+            }
+          }
+        } else {
+          writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
+              localFile, options);
+        }
+      }
+    } finally {
+      IOUtils.closeStream(fileListWriter);
+    }
+  }
+
+  private Path computeSourceRootPath(FileStatus sourceStatus,
+                                     DistCpOptions options) throws IOException {
+
+    Path target = options.getTargetPath();
+    FileSystem targetFS = target.getFileSystem(getConf());
+
+    boolean solitaryFile = options.getSourcePaths().size() == 1
+                                                && !sourceStatus.isDir();
+
+    if (solitaryFile) {
+      if (targetFS.isFile(target) || !targetFS.exists(target)) {
+        return sourceStatus.getPath();
+      } else {
+        return sourceStatus.getPath().getParent();
+      }
+    } else {
+      boolean specialHandling = (options.getSourcePaths().size() == 1 && !targetFS.exists(target)) ||
+          options.shouldSyncFolder() || options.shouldOverwrite();
+
+      return specialHandling && sourceStatus.isDir() ? sourceStatus.getPath() :
+          sourceStatus.getPath().getParent();
+    }
+  }
+
+  /**
+   * Provide an option to skip copy of a path, Allows for exclusion
+   * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
+   * @param path - Path being considered for copy while building the file listing
+   * @param options - Input options passed during DistCp invocation
+   * @return - True if the path should be considered for copy, false otherwise
+   */
+  protected boolean shouldCopy(Path path, DistCpOptions options) {
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return totalBytesToCopy;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return totalPaths;
+  }
+
+  private Path makeQualified(Path path) throws IOException {
+    final FileSystem fs = path.getFileSystem(getConf());
+    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(getConf());
+    if (fs.exists(pathToListFile)) {
+      fs.delete(pathToListFile, false);
+    }
+    return SequenceFile.createWriter(fs, getConf(), pathToListFile,
+        Text.class, FileStatus.class, SequenceFile.CompressionType.NONE);
+  }
+
+  private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
+                                    FileStatus fileStatus) throws IOException {
+    return fileStatus.isDir() && getChildren(fileSystem, fileStatus).length > 0;
+  }
+
+  private static FileStatus[] getChildren(FileSystem fileSystem,
+                                         FileStatus parent) throws IOException {
+    return fileSystem.listStatus(parent.getPath());
+  }
+
+  private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
+                                         FileStatus sourceStatus,
+                                         Path sourcePathRoot,
+                                         boolean localFile,
+                                         DistCpOptions options)
+                                         throws IOException {
+    FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
+    Stack<FileStatus> pathStack = new Stack<FileStatus>();
+    pathStack.push(sourceStatus);
+
+    while (!pathStack.isEmpty()) {
+      for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Recording source-path: "
+                    + sourceStatus.getPath() + " for copy.");
+        writeToFileListing(fileListWriter, child, sourcePathRoot,
+             localFile, options);
+        if (isDirectoryAndNotEmpty(sourceFS, child)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("Traversing non-empty source dir: "
+                       + sourceStatus.getPath());
+          pathStack.push(child);
+        }
+      }
+    }
+  }
+
+  private void writeToFileListing(SequenceFile.Writer fileListWriter,
+                                  FileStatus fileStatus,
+                                  Path sourcePathRoot,
+                                  boolean localFile,
+                                  DistCpOptions options) throws IOException {
+    if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDir())
+      return; // Skip the root-paths.
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
+    }
+
+    FileStatus status = fileStatus;
+    if (localFile) {
+      status = getFileStatus(fileStatus);
+    }
+
+    if (!shouldCopy(fileStatus.getPath(), options)) {
+      return;
+    }
+
+    fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
+        fileStatus.getPath())), status);
+    fileListWriter.sync();
+
+    if (!fileStatus.isDir()) {
+      totalBytesToCopy += fileStatus.getLen();
+    }
+    totalPaths++;
+  }
+
+  private static final ByteArrayOutputStream buffer = new ByteArrayOutputStream(64);
+  private DataInputBuffer in = new DataInputBuffer();
+  
+  private FileStatus getFileStatus(FileStatus fileStatus) throws IOException {
+    FileStatus status = new FileStatus();
+
+    buffer.reset();
+    DataOutputStream out = new DataOutputStream(buffer);
+    fileStatus.write(out);
+
+    in.reset(buffer.toByteArray(), 0, buffer.size());
+    status.readFields(in);
+    return status;
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyCommitter.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.GlobbedCopyListing;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+/**
+ * The CopyCommitter class is DistCp's OutputCommitter implementation. It is
+ * responsible for handling the completion/cleanup of the DistCp run.
+ * Specifically, it does the following:
+ *  1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.)
+ *  2. Preservation of user/group/replication-factor on any directories that
+ *     have been copied. (Files are taken care of in their map-tasks.)
+ *  3. Atomic-move of data from the temporary work-folder to the final path
+ *     (if atomic-commit was opted for).
+ *  4. Deletion of files from the target that are missing at source (if opted for).
+ *  5. Cleanup of any partially copied files, from previous, failed attempts.
+ */
+public class CopyCommitter extends FileOutputCommitter {
+  private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+
+  private final TaskAttemptContext taskAttemptContext;
+
+  /**
+   * Create a output committer
+   *
+   * @param outputPath the job's output path
+   * @param context    the task's context
+   * @throws IOException - Exception if any
+   */
+  public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.taskAttemptContext = context;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    Configuration conf = jobContext.getConfiguration();
+    super.commitJob(jobContext);
+
+    cleanupTempFiles(jobContext);
+
+    String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    if (attributes != null && !attributes.isEmpty()) {
+      preserveFileAttributesForDirectories(conf);
+    }
+
+    try {
+      if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) {
+        deleteMissing(conf);
+      } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
+        commitData(conf);
+      }
+      taskAttemptContext.setStatus("Commit Successful");
+    }
+    finally {
+      cleanup(conf);
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void abortJob(JobContext jobContext,
+                       JobStatus.State state) throws IOException {
+    try {
+      super.abortJob(jobContext, state);
+    } finally {
+      cleanupTempFiles(jobContext);
+      cleanup(jobContext.getConfiguration());
+    }
+  }
+
+  private void cleanupTempFiles(JobContext context) {
+    try {
+      Configuration conf = context.getConfiguration();
+
+      Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+      FileSystem targetFS = targetWorkPath.getFileSystem(conf);
+
+      String jobId = context.getJobID().toString();
+      deleteAttemptTempFiles(targetWorkPath, targetFS, jobId);
+      deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId);
+    } catch (Throwable t) {
+      LOG.warn("Unable to cleanup temp files", t);
+    }
+  }
+
+  private void deleteAttemptTempFiles(Path targetWorkPath,
+                                      FileSystem targetFS,
+                                      String jobId) throws IOException {
+
+    FileStatus[] tempFiles = targetFS.globStatus(
+        new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*"));
+
+    if (tempFiles != null && tempFiles.length > 0) {
+      for (FileStatus file : tempFiles) {
+        LOG.info("Cleaning up " + file.getPath());
+        targetFS.delete(file.getPath(), false);
+      }
+    }
+  }
+
+  /**
+   * Cleanup meta folder and other temporary files
+   *
+   * @param conf - Job Configuration
+   */
+  private void cleanup(Configuration conf) {
+    Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER));
+    try {
+      FileSystem fs = metaFolder.getFileSystem(conf);
+      LOG.info("Cleaning up temporary work folder: " + metaFolder);
+      fs.delete(metaFolder, true);
+    } catch (IOException ignore) {
+      LOG.error("Exception encountered ", ignore);
+    }
+  }
+
+  // This method changes the target-directories' file-attributes (owner,
+  // user/group permissions, etc.) based on the corresponding source directories.
+  private void preserveFileAttributesForDirectories(Configuration conf) throws IOException {
+    String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
+    LOG.info("About to preserve attributes: " + attrSymbols);
+
+    EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols);
+
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+        clusterFS, sourceListing, conf);
+    long totalLen = clusterFS.getFileStatus(sourceListing).getLen();
+
+    Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    long preservedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+
+      // Iterate over every source path that was copied.
+      while (sourceReader.next(srcRelPath, srcFileStatus)) {
+        // File-attributes for files are set at the time of copy,
+        // in the map-task.
+        if (! srcFileStatus.isDir()) continue;
+
+        Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath);
+
+        // Skip the root folder.
+        // Status can't be preserved on root-folder. (E.g. multiple paths may
+        // be copied to a single target folder. Which source-attributes to use
+        // on the target is undefined.)
+        if (targetRoot.equals(targetFile)) continue;
+
+        FileSystem targetFS = targetFile.getFileSystem(conf);
+        DistCpUtils.preserve(targetFS, targetFile, srcFileStatus,  attributes);
+
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Preserving status on directory entries. [" +
+            sourceReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+    }
+    LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
+  }
+
+  // This method deletes "extra" files from the target, if they're not
+  // available at the source.
+  private void deleteMissing(Configuration conf) throws IOException {
+    LOG.info("-delete option is enabled. About to remove entries from " +
+        "target that are missing in source");
+
+    // Sort the source-file listing alphabetically.
+    Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+    FileSystem clusterFS = sourceListing.getFileSystem(conf);
+    Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+
+    // Similarly, create the listing of target-files. Sort alphabetically.
+    Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
+    CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+
+    List<Path> targets = new ArrayList<Path>(1);
+    Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    targets.add(targetFinalPath);
+    DistCpOptions options = new DistCpOptions(targets, new Path("/NONE"));
+
+    target.buildListing(targetListing, options);
+    Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+    long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
+
+    SequenceFile.Reader sourceReader = new SequenceFile.Reader(
+        clusterFS, sortedSourceListing, conf);
+    SequenceFile.Reader targetReader = new SequenceFile.Reader(
+        clusterFS, sortedTargetListing, conf);
+
+    // Walk both source and target file listings.
+    // Delete all from target that doesn't also exist on source.
+    long deletedEntries = 0;
+    try {
+      FileStatus srcFileStatus = new FileStatus();
+      Text srcRelPath = new Text();
+      FileStatus trgtFileStatus = new FileStatus();
+      Text trgtRelPath = new Text();
+
+      FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+      boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+      while (targetReader.next(trgtRelPath, trgtFileStatus)) {
+        // Skip sources that don't exist on target.
+        while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
+          srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
+        }
+
+        if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
+
+        // Target doesn't exist at source. Delete.
+        boolean result = (!targetFS.exists(trgtFileStatus.getPath()) ||
+            targetFS.delete(trgtFileStatus.getPath(), true));
+        if (result) {
+          LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
+          deletedEntries++;
+        } else {
+          throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+        }
+        taskAttemptContext.progress();
+        taskAttemptContext.setStatus("Deleting missing files from target. [" +
+            targetReader.getPosition() * 100 / totalLen + "%]");
+      }
+    } finally {
+      IOUtils.closeStream(sourceReader);
+      IOUtils.closeStream(targetReader);
+    }
+    LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+  }
+
+  private void commitData(Configuration conf) throws IOException {
+
+    Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    FileSystem targetFS = workDir.getFileSystem(conf);
+
+    LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir);
+    if (targetFS.exists(finalDir) && targetFS.exists(workDir)) {
+      LOG.error("Pre-existing final-path found at: " + finalDir);
+      throw new IOException("Target-path can't be committed to because it " +
+          "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". ");
+    }
+
+    boolean result = targetFS.rename(workDir, finalDir);
+    if (!result) {
+      LOG.warn("Rename failed. Perhaps data already moved. Verifying...");
+      result = targetFS.exists(finalDir) && !targetFS.exists(workDir);
+    }
+    if (result) {
+      LOG.info("Data committed successfully to " + finalDir);
+      taskAttemptContext.setStatus("Data committed successfully to " + finalDir);
+    } else {
+      LOG.error("Unable to commit data to " + finalDir);
+      throw new IOException("Atomic commit failed. Temporary data in " + workDir +
+        ", Unable to move to " + finalDir);
+    }
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyMapper.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyMapper.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyMapper.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptionSwitch;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Mapper class that executes the DistCp copy operation.
+ * Implements the o.a.h.mapreduce.Mapper<> interface.
+ */
+public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
+
+  /**
+   * Hadoop counters for the DistCp CopyMapper.
+   * (These have been kept identical to the old DistCp,
+   * for backward compatibility.)
+   */
+  public static enum Counter {
+    COPY,         // Number of files received by the mapper for copy.
+    SKIP,         // Number of files skipped.
+    FAIL,         // Number of files that failed to be copied.
+    BYTESCOPIED,  // Number of bytes actually copied by the copy-mapper, total.
+    BYTESEXPECTED,// Number of bytes expected to be copied.
+    BYTESFAILED,  // Number of bytes that failed to be copied.
+    BYTESSKIPPED, // Number of bytes that were skipped from copy.
+  }
+
+  private static Log LOG = LogFactory.getLog(CopyMapper.class);
+
+  private Configuration conf;
+
+  private boolean syncFolders = false;
+  private boolean ignoreFailures = false;
+  private boolean skipCrc = false;
+  private boolean overWrite = false;
+  private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
+
+  private FileSystem targetFS = null;
+  private Path    targetWorkPath = null;
+
+  /**
+   * Implementation of the Mapper::setup() method. This extracts the DistCp-
+   * options specified in the Job's configuration, to set up the Job.
+   * @param context Mapper's context.
+   * @throws IOException On IO failure.
+   * @throws InterruptedException If the job is interrupted.
+   */
+  @Override
+  public void setup(Context context) throws IOException, InterruptedException {
+    conf = context.getConfiguration();
+
+    syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
+    ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
+    skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
+    overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
+    preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
+        PRESERVE_STATUS.getConfigLabel()));
+
+    targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+    Path targetFinalPath = new Path(conf.get(
+            DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+    targetFS = targetFinalPath.getFileSystem(conf);
+
+    if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
+      overWrite = true; // When target is an existing file, overwrite it.
+    }
+
+    if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
+      initializeSSLConf(context);
+    }
+  }
+
+  /**
+   * Initialize SSL Config if same is set in conf
+   *
+   * @throws IOException - If any
+   */
+  private void initializeSSLConf(Context context) throws IOException {
+    LOG.info("Initializing SSL configuration");
+    
+    String workDir = ((JobConf)conf).getJobLocalDir() + "/work";
+    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+    Configuration sslConfig = new Configuration(false);
+    String sslConfFileName = conf.get(DistCpConstants.CONF_LABEL_SSL_CONF);
+    Path sslClient = findCacheFile(cacheFiles, sslConfFileName);
+    if (sslClient == null) {
+      LOG.warn("SSL Client config file not found. Was looking for " + sslConfFileName +
+          " in " + Arrays.toString(cacheFiles));
+      return;
+    }
+    sslConfig.addResource(sslClient);
+
+    String trustStoreFile = conf.get("ssl.client.truststore.location");
+    Path trustStorePath = findCacheFile(cacheFiles, trustStoreFile);
+    sslConfig.set("ssl.client.truststore.location", trustStorePath.toString());
+
+    String keyStoreFile = conf.get("ssl.client.keystore.location");
+    Path keyStorePath = findCacheFile(cacheFiles, keyStoreFile);
+    sslConfig.set("ssl.client.keystore.location", keyStorePath.toString());
+
+    try {
+      OutputStream out = new FileOutputStream(workDir + "/" + sslConfFileName);
+      try {
+        sslConfig.writeXml(out);
+      } finally {
+        out.close();
+      }
+      conf.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfFileName);
+    } catch (IOException e) {
+      LOG.warn("Unable to write out the ssl configuration. " +
+          "Will fall back to default ssl-client.xml in class path, if there is one", e);
+    }
+  }
+
+  /**
+   * Find entry from distributed cache
+   *
+   * @param cacheFiles - All localized cache files
+   * @param fileName - fileName to search
+   * @return Path of the filename if found, else null
+   */
+  private Path findCacheFile(Path[] cacheFiles, String fileName) {
+    if (cacheFiles != null && cacheFiles.length > 0) {
+      for (Path file : cacheFiles) {
+        if (file.getName().equals(fileName)) {
+          return file;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Implementation of the Mapper<>::map(). Does the copy.
+   * @param relPath The target path.
+   * @param sourceFileStatus The source path.
+   * @throws IOException
+   */
+  @Override
+  public void map(Text relPath, FileStatus sourceFileStatus, Context context)
+          throws IOException, InterruptedException {
+    Path sourcePath = sourceFileStatus.getPath();
+
+    if (LOG.isDebugEnabled())
+      LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
+
+    Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
+                          targetFS.getWorkingDirectory()) + relPath.toString());
+
+    EnumSet<DistCpOptions.FileAttribute> fileAttributes
+            = getFileAttributeSettings(context);
+
+    final String description = "Copying " + sourcePath + " to " + target;
+    context.setStatus(description);
+
+    LOG.info(description);
+
+    try {
+      FileStatus sourceCurrStatus;
+      FileSystem sourceFS;
+      try {
+        sourceFS = sourcePath.getFileSystem(conf);
+        sourceCurrStatus = sourceFS.getFileStatus(sourcePath);
+      } catch (FileNotFoundException e) {
+        throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
+      }
+
+      FileStatus targetStatus = null;
+
+      try {
+        targetStatus = targetFS.getFileStatus(target);
+      } catch (FileNotFoundException ignore) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Path could not be found: " + target, ignore);
+      }
+
+      if (targetStatus != null && (targetStatus.isDir() != sourceCurrStatus.isDir())) {
+        throw new IOException("Can't replace " + target + ". Target is " +
+            getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
+      }
+
+      if (sourceCurrStatus.isDir()) {
+        createTargetDirsWithRetry(description, target, context);
+        return;
+      }
+
+      if (skipFile(sourceFS, sourceCurrStatus, target)) {
+        LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+                 + " to " + target);
+        updateSkipCounters(context, sourceCurrStatus);
+        context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
+      }
+      else {
+        copyFileWithRetry(description, sourceCurrStatus, target, context,
+                          fileAttributes);
+      }
+
+      DistCpUtils.preserve(target.getFileSystem(conf), target,
+                           sourceCurrStatus, fileAttributes);
+
+    } catch (IOException exception) {
+      handleFailures(exception, sourceFileStatus, target, context);
+    }
+  }
+
+  private String getFileType(FileStatus fileStatus) {
+    return fileStatus == null ? "N/A" : (fileStatus.isDir() ? "dir" : "file");
+  }
+
+  private static EnumSet<DistCpOptions.FileAttribute>
+          getFileAttributeSettings(Mapper.Context context) {
+    String attributeString = context.getConfiguration().get(
+            DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel());
+    return DistCpUtils.unpackAttributes(attributeString);
+  }
+
+  private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
+               Path target, Context context,
+               EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
+
+    long bytesCopied;
+    try {
+      bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description)
+                       .execute(sourceFileStatus, target, context, fileAttributes);
+    } catch (Exception e) {
+      context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
+      throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
+          " --> " + target, e);
+    }
+    incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
+    incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
+    incrementCounter(context, Counter.COPY, 1);
+  }
+
+  private void createTargetDirsWithRetry(String description,
+                   Path target, Context context) throws IOException {
+    try {
+      new RetriableDirectoryCreateCommand(description).execute(target, context);
+    } catch (Exception e) {
+      throw new IOException("mkdir failed for " + target, e);
+    }
+    incrementCounter(context, Counter.COPY, 1);
+  }
+
+  private static void updateSkipCounters(Context context,
+                                         FileStatus sourceFile) {
+    incrementCounter(context, Counter.SKIP, 1);
+    incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
+
+  }
+
+  private void handleFailures(IOException exception,
+                                     FileStatus sourceFileStatus, Path target,
+                                     Context context) throws IOException, InterruptedException {
+    LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
+                target, exception);
+
+    if (ignoreFailures && exception.getCause() instanceof
+            RetriableFileCopyCommand.CopyReadException) {
+      incrementCounter(context, Counter.FAIL, 1);
+      incrementCounter(context, Counter.BYTESFAILED, sourceFileStatus.getLen());
+      context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " - " + 
+          StringUtils.stringifyException(exception)));
+    }
+    else
+      throw exception;
+  }
+
+  private static void incrementCounter(Context context, Counter counter,
+                                       long value) {
+    context.getCounter(counter).increment(value);
+  }
+
+  private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
+                                          throws IOException {
+    return     targetFS.exists(target)
+            && !overWrite
+            && !mustUpdate(sourceFS, source, target);
+  }
+
+  private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
+                                    throws IOException {
+    final FileStatus targetFileStatus = targetFS.getFileStatus(target);
+
+    return     syncFolders
+            && (
+                   targetFileStatus.getLen() != source.getLen()
+                || (!skipCrc &&
+                       !DistCpUtils.checksumsAreEqual(sourceFS,
+                                          source.getPath(), targetFS, target))
+                || (source.getBlockSize() != targetFileStatus.getBlockSize() &&
+                      preserve.contains(FileAttribute.BLOCKSIZE))
+               );
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyOutputFormat.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyOutputFormat.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/CopyOutputFormat.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+
+import java.io.IOException;
+
+/**
+ * The CopyOutputFormat is the Hadoop OutputFormat used in DistCp.
+ * It sets up the Job's Configuration (in the Job-Context) with the settings
+ * for the work-directory, final commit-directory, etc. It also sets the right
+ * output-committer.
+ * @param <K>
+ * @param <V>
+ */
+public class CopyOutputFormat<K, V> extends TextOutputFormat<K, V> {
+
+  /**
+   * Setter for the working directory for DistCp (where files will be copied
+   * before they are moved to the final commit-directory.)
+   * @param job The Job on whose configuration the working-directory is to be set.
+   * @param workingDirectory The path to use as the working directory.
+   */
+  public static void setWorkingDirectory(Job job, Path workingDirectory) {
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH,
+        workingDirectory.toString());
+  }
+
+  /**
+   * Setter for the final directory for DistCp (where files copied will be
+   * moved, atomically.)
+   * @param job The Job on whose configuration the working-directory is to be set.
+   * @param commitDirectory The path to use for final commit.
+   */
+  public static void setCommitDirectory(Job job, Path commitDirectory) {
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH,
+        commitDirectory.toString());
+  }
+
+  /**
+   * Getter for the working directory.
+   * @param job The Job from whose configuration the working-directory is to
+   * be retrieved.
+   * @return The working-directory Path.
+   */
+  public static Path getWorkingDirectory(Job job) {
+    return getWorkingDirectory(job.getConfiguration());
+  }
+
+  private static Path getWorkingDirectory(Configuration conf) {
+    String workingDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH);
+    if (workingDirectory == null || workingDirectory.isEmpty()) {
+      return null;
+    } else {
+      return new Path(workingDirectory);
+    }
+  }
+
+  /**
+   * Getter for the final commit-directory.
+   * @param job The Job from whose configuration the commit-directory is to be
+   * retrieved.
+   * @return The commit-directory Path.
+   */
+  public static Path getCommitDirectory(Job job) {
+    return getCommitDirectory(job.getConfiguration());
+  }
+
+  private static Path getCommitDirectory(Configuration conf) {
+    String commitDirectory = conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH);
+    if (commitDirectory == null || commitDirectory.isEmpty()) {
+      return null;
+    } else {
+      return new Path(commitDirectory);
+    }
+  }
+
+  /** @inheritDoc */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+    return new CopyCommitter(getOutputPath(context), context);
+  }
+
+  /** @inheritDoc */
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    if (getCommitDirectory(conf) == null) {
+      throw new IllegalStateException("Commit directory not configured");
+    }
+
+    Path workingPath = getWorkingDirectory(conf);
+    if (workingPath == null) {
+      throw new IllegalStateException("Working directory not configured");
+    }
+
+    // get delegation token for outDir's file system
+    TokenCache.obtainTokensForNamenodes(context.getCredentials(),
+                                        new Path[] {workingPath}, conf);
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableDirectoryCreateCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableDirectoryCreateCommand.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableDirectoryCreateCommand.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableDirectoryCreateCommand.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import org.apache.hadoop.tools.distcp2.util.RetriableCommand;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * This class extends Retriable command to implement the creation of directories
+ * with retries on failure.
+ */
+public class RetriableDirectoryCreateCommand extends RetriableCommand {
+
+  /**
+   * Constructor, taking a description of the action.
+   * @param description Verbose description of the copy operation.
+   */
+  public RetriableDirectoryCreateCommand(String description) {
+    super(description);
+  }
+
+  /**
+   * Implementation of RetriableCommand::doExecute().
+   * This implements the actual mkdirs() functionality.
+   * @param arguments Argument-list to the command.
+   * @return Boolean. True, if the directory could be created successfully.
+   * @throws Exception IOException, on failure to create the directory.
+   */
+  @Override
+  protected Object doExecute(Object... arguments) throws Exception {
+    assert arguments.length == 2 : "Unexpected argument list.";
+    Path target = (Path)arguments[0];
+    Mapper.Context context = (Mapper.Context)arguments[1];
+
+    FileSystem targetFS = target.getFileSystem(context.getConfiguration());
+    return targetFS.mkdirs(target);
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableFileCopyCommand.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableFileCopyCommand.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/RetriableFileCopyCommand.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import org.apache.hadoop.tools.distcp2.util.RetriableCommand;
+import org.apache.hadoop.tools.distcp2.util.ThrottledInputStream;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.*;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.EnumSet;
+
+/**
+ * This class extends RetriableCommand to implement the copy of files,
+ * with retries on failure.
+ */
+public class RetriableFileCopyCommand extends RetriableCommand {
+
+  private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
+  private static int BUFFER_SIZE = 8 * 1024;
+  private boolean skipCrc = false;
+  
+  /**
+   * Constructor, taking a description of the action.
+   * @param description Verbose description of the copy operation.
+   */
+  public RetriableFileCopyCommand(String description) {
+    super(description);
+  }
+ 
+  /**
+   * Create a RetriableFileCopyCommand.
+   *
+   * @param skipCrc Whether to skip the crc check.
+   * @param description A verbose description of the copy operation.
+   */
+  public RetriableFileCopyCommand(boolean skipCrc, String description) {
+    this(description);
+    this.skipCrc = skipCrc;
+  }
+
+  /**
+   * Implementation of RetriableCommand::doExecute().
+   * This is the actual copy-implementation.
+   * @param arguments Argument-list to the command.
+   * @return Number of bytes copied.
+   * @throws Exception: CopyReadException, if there are read-failures. All other
+   *         failures are IOExceptions.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Object doExecute(Object... arguments) throws Exception {
+    assert arguments.length == 4 : "Unexpected argument list.";
+    FileStatus source = (FileStatus)arguments[0];
+    assert !source.isDir() : "Unexpected file-status. Expected file.";
+    Path target = (Path)arguments[1];
+    Mapper.Context context = (Mapper.Context)arguments[2];
+    EnumSet<FileAttribute> fileAttributes
+            = (EnumSet<FileAttribute>)arguments[3];
+    return doCopy(source, target, context, fileAttributes);
+  }
+
+  private long doCopy(FileStatus sourceFileStatus, Path target,
+                      Mapper.Context context,
+                      EnumSet<FileAttribute> fileAttributes)
+          throws IOException {
+
+    Path tmpTargetPath = getTmpFile(target, context);
+    final Configuration configuration = context.getConfiguration();
+    FileSystem targetFS = target.getFileSystem(configuration);
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
+        LOG.debug("Tmp-file path: " + tmpTargetPath);
+      }
+      FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
+              configuration);
+      long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
+                                     context, fileAttributes);
+
+      compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead);
+      //At this point, src&dest lengths are same. if length==0, we skip checksum
+      if ((bytesRead != 0) && (!skipCrc)) {
+        compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath);
+      }
+      promoteTmpToTarget(tmpTargetPath, target, targetFS);
+      return bytesRead;
+
+    } finally {
+      if (targetFS.exists(tmpTargetPath))
+        targetFS.delete(tmpTargetPath, false);
+    }
+  }
+
+  private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
+                             FileStatus sourceFileStatus, Mapper.Context context,
+                             EnumSet<FileAttribute> fileAttributes)
+                             throws IOException {
+    OutputStream outStream = new BufferedOutputStream(targetFS.create(
+            tmpTargetPath, true, BUFFER_SIZE,
+            getReplicationFactor(fileAttributes, sourceFileStatus, targetFS),
+            getBlockSize(fileAttributes, sourceFileStatus, targetFS), context));
+    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
+  }
+
+  private void compareFileLengths(FileStatus sourceFileStatus, Path target,
+                                  Configuration configuration, long bytesRead)
+                                  throws IOException {
+    final Path sourcePath = sourceFileStatus.getPath();
+    FileSystem fs = sourcePath.getFileSystem(configuration);
+    if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
+      throw new IOException("Mismatch in length of source:" + sourcePath
+                + " and target:" + target);
+  }
+
+  private void compareCheckSums(FileSystem sourceFS, Path source,
+                                FileSystem targetFS, Path target)
+                                throws IOException {
+    if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target))
+      throw new IOException("Check-sum mismatch between "
+                              + source + " and " + target);
+
+  }
+
+  //If target file exists and unable to delete target - fail
+  //If target doesn't exist and unable to create parent folder - fail
+  //If target is successfully deleted and parent exists, if rename fails - fail
+  private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs)
+                                  throws IOException {
+    if ((fs.exists(target) && !fs.delete(target, false))
+        || (!fs.exists(target.getParent()) && !fs.mkdirs(target.getParent()))
+        || !fs.rename(tmpTarget, target)) {
+      throw new IOException("Failed to promote tmp-file:" + tmpTarget
+                              + " to: " + target);
+    }
+  }
+
+  private Path getTmpFile(Path target, Mapper.Context context) {
+    Path targetWorkPath = new Path(context.getConfiguration().
+        get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
+
+    Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath;
+    LOG.info("Creating temp file: " +
+        new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()));
+    return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
+  }
+
+  private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
+                         int bufferSize, Mapper.Context context)
+      throws IOException {
+    Path source = sourceFileStatus.getPath();
+    byte buf[] = new byte[bufferSize];
+    ThrottledInputStream inStream = null;
+    long totalBytesRead = 0;
+
+    try {
+      inStream = getInputStream(source, context.getConfiguration());
+      int bytesRead = readBytes(inStream, buf);
+      while (bytesRead >= 0) {
+        totalBytesRead += bytesRead;
+        outStream.write(buf, 0, bytesRead);
+        updateContextStatus(totalBytesRead, context, sourceFileStatus);
+        bytesRead = inStream.read(buf);
+      }
+    } finally {
+      IOUtils.cleanup(LOG, outStream, inStream);
+    }
+
+    return totalBytesRead;
+  }
+
+  private void updateContextStatus(long totalBytesRead, Mapper.Context context,
+                                   FileStatus sourceFileStatus) {
+    StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
+                .format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
+    message.append("% ")
+            .append(description).append(" [")
+            .append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
+            .append('/')
+        .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
+            .append(']');
+    context.setStatus(message.toString());
+  }
+
+  private static int readBytes(InputStream inStream, byte buf[])
+          throws IOException {
+    try {
+      return inStream.read(buf);
+    }
+    catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static ThrottledInputStream getInputStream(Path path, Configuration conf)
+          throws IOException {
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+              DistCpConstants.DEFAULT_BANDWIDTH_MB);
+      return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
+              bandwidthMB * 1024 * 1024);
+    }
+    catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static short getReplicationFactor(
+          EnumSet<FileAttribute> fileAttributes,
+          FileStatus sourceFile, FileSystem targetFS) {
+    return fileAttributes.contains(FileAttribute.REPLICATION)?
+            sourceFile.getReplication() : targetFS.getDefaultReplication();
+  }
+
+  private static long getBlockSize(
+          EnumSet<FileAttribute> fileAttributes,
+          FileStatus sourceFile, FileSystem targetFS) {
+    return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
+            sourceFile.getBlockSize() : targetFS.getDefaultBlockSize();
+  }
+
+  /**
+   * Special subclass of IOException. This is used to distinguish read-operation
+   * failures from other kinds of IOExceptions.
+   * The failure to read from source is dealt with specially, in the CopyMapper.
+   * Such failures may be skipped if the DistCpOptions indicate so.
+   * Write failures are intolerable, and amount to CopyMapper failure.  
+   */
+  public static class CopyReadException extends IOException {
+    public CopyReadException(Throwable rootCause) {
+      super(rootCause);
+    }
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/UniformSizeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/UniformSizeInputFormat.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/UniformSizeInputFormat.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/UniformSizeInputFormat.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * UniformSizeInputFormat extends the InputFormat<> class, to produce
+ * input-splits for DistCp.
+ * It looks at the copy-listing and groups the contents into input-splits such
+ * that the total-number of bytes to be copied for each input split is
+ * uniform.
+ */
+public class UniformSizeInputFormat extends InputFormat<Text, FileStatus> {
+  private static final Log LOG
+                = LogFactory.getLog(UniformSizeInputFormat.class);
+
+  /**
+   * Implementation of InputFormat::getSplits(). Returns a list of InputSplits,
+   * such that the number of bytes to be copied for all the splits are
+   * approximately equal.
+   * @param context JobContext for the job.
+   * @return The list of uniformly-distributed input-splits.
+   * @throws IOException: On failure.
+   * @throws InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+                      throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+    int numSplits = DistCpUtils.getNumMapTasks(configuration);
+
+    if (numSplits == 0) return new ArrayList<InputSplit>();
+
+    return getSplits(configuration, numSplits,
+                     DistCpUtils.getLong(configuration,
+                          DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
+  }
+
+  private List<InputSplit> getSplits(Configuration configuration, int numSplits,
+                                     long totalSizeBytes) throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+    long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);
+
+    FileStatus srcFileStatus = new FileStatus();
+    Text srcRelPath = new Text();
+    long currentSplitSize = 0;
+    long lastSplitStart = 0;
+    long lastPosition = 0;
+
+    final Path listingFilePath = getListingFilePath(configuration);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Average bytes per map: " + nBytesPerSplit +
+          ", Number of maps: " + numSplits + ", total size: " + totalSizeBytes);
+    }
+    SequenceFile.Reader reader=null;
+    try {
+      reader = getListingFileReader(configuration);
+      while (reader.next(srcRelPath, srcFileStatus)) {
+        // If adding the current file would cause the bytes per map to exceed
+        // limit. Add the current file to new split
+        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) {
+          FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+              lastPosition - lastSplitStart, null);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+          }
+          splits.add(split);
+          lastSplitStart = lastPosition;
+          currentSplitSize = 0;
+        }
+        currentSplitSize += srcFileStatus.getLen();
+        lastPosition = reader.getPosition();
+      }
+      if (lastPosition > lastSplitStart) {
+        FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
+            lastPosition - lastSplitStart, null);
+        if (LOG.isDebugEnabled()) {
+          LOG.info ("Creating split : " + split + ", bytes in split: " + currentSplitSize);
+        }
+        splits.add(split);
+      }
+
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+
+    return splits;
+  }
+
+  private static Path getListingFilePath(Configuration configuration) {
+    final String listingFilePathString =
+            configuration.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+
+    assert !listingFilePathString.equals("")
+              : "Couldn't find listing file. Invalid input.";
+    return new Path(listingFilePathString);
+  }
+
+  private SequenceFile.Reader getListingFileReader(Configuration configuration) {
+
+    final Path listingFilePath = getListingFilePath(configuration);
+    try {
+      final FileSystem fileSystem = listingFilePath.getFileSystem(configuration);
+      if (!fileSystem.exists(listingFilePath))
+        throw new IllegalArgumentException("Listing file doesn't exist at: "
+                                           + listingFilePath);
+
+      return new SequenceFile.Reader(fileSystem, listingFilePath, configuration);
+    }
+    catch (IOException exception) {
+      LOG.error("Couldn't find listing file at: " + listingFilePath, exception);
+      throw new IllegalArgumentException("Couldn't find listing-file at: "
+                                         + listingFilePath, exception);
+    }
+  }
+
+  /**
+   * Implementation of InputFormat::createRecordReader().
+   * @param split The split for which the RecordReader is sought.
+   * @param context The context of the current task-attempt.
+   * @return A SequenceFileRecordReader instance, (since the copy-listing is a
+   * simple sequence-file.)
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public RecordReader<Text, FileStatus> createRecordReader(InputSplit split,
+                                                     TaskAttemptContext context)
+                                      throws IOException, InterruptedException {
+    return new SequenceFileRecordReader<Text, FileStatus>();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputChunk.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputChunk.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputChunk.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/mapred/lib/DynamicInputChunk.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.distcp2.mapred.lib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+import java.io.IOException;
+
+/**
+ * The DynamicInputChunk represents a single chunk of work, when used in
+ * conjunction with the DynamicInputFormat and the DynamicRecordReader.
+ * The records in the DynamicInputFormat's input-file are split across various
+ * DynamicInputChunks. Each one is claimed and processed in an iteration of
+ * a dynamic-mapper. When a DynamicInputChunk has been exhausted, the faster
+ * mapper may claim another and process it, until there are no more to be
+ * consumed.
+ */
+class DynamicInputChunk<K, V> {
+  private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
+
+  private static Configuration configuration;
+  private static Path chunkRootPath;
+  private static String chunkFilePrefix;
+  private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan.
+  private static FileSystem fs;
+
+  private Path chunkFilePath;
+  private SequenceFileRecordReader<K, V> reader;
+  private SequenceFile.Writer writer;
+
+  private static void initializeChunkInvariants(Configuration config)
+                                                  throws IOException {
+    configuration = config;
+    Path listingFilePath = new Path(getListingFilePath(configuration));
+    chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
+    fs = chunkRootPath.getFileSystem(configuration);
+    chunkFilePrefix = listingFilePath.getName() + ".chunk.";
+  }
+
+  private static String getListingFilePath(Configuration configuration) {
+    final String listingFileString = configuration.get(
+            DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
+    assert !listingFileString.equals("") : "Listing file not found.";
+    return listingFileString;
+  }
+
+  private static boolean areInvariantsInitialized() {
+    return chunkRootPath != null;
+  }
+
+  private DynamicInputChunk(String chunkId, Configuration configuration)
+                                                      throws IOException {
+    if (!areInvariantsInitialized())
+      initializeChunkInvariants(configuration);
+
+    chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
+    openForWrite();
+  }
+
+
+  private void openForWrite() throws IOException {
+    writer = SequenceFile.createWriter(
+            chunkFilePath.getFileSystem(configuration), configuration,
+            chunkFilePath, Text.class, FileStatus.class,
+            SequenceFile.CompressionType.NONE);
+
+  }
+
+  /**
+   * Factory method to create chunk-files for writing to.
+   * (For instance, when the DynamicInputFormat splits the input-file into
+   * chunks.)
+   * @param chunkId String to identify the chunk.
+   * @param configuration Configuration, describing the location of the listing-
+   * file, file-system for the map-job, etc.
+   * @return A DynamicInputChunk, corresponding to a chunk-file, with the name
+   * incorporating the chunk-id.
+   * @throws IOException Exception on failure to create the chunk.
+   */
+  public static DynamicInputChunk createChunkForWrite(String chunkId,
+                          Configuration configuration) throws IOException {
+    return new DynamicInputChunk(chunkId, configuration);
+  }
+
+  /**
+   * Method to write records into a chunk.
+   * @param key Key from the listing file.
+   * @param value Corresponding value from the listing file.
+   * @throws IOException Exception onf failure to write to the file.
+   */
+  public void write(Text key, FileStatus value) throws IOException {
+    writer.append(key, value);
+  }
+
+  /**
+   * Closes streams opened to the chunk-file.
+   */
+  public void close() {
+    IOUtils.cleanup(LOG, reader, writer);
+  }
+
+  /**
+   * Reassigns the chunk to a specified Map-Task, for consumption.
+   * @param taskId The Map-Task to which a the chunk is to be reassigned.
+   * @throws IOException Exception on failure to reassign.
+   */
+  public void assignTo(TaskID taskId) throws IOException {
+    Path newPath = new Path(chunkRootPath, taskId.toString());
+    if (!fs.rename(chunkFilePath, newPath)) {
+      LOG.warn(chunkFilePath + " could not be assigned to " + taskId);
+    }
+  }
+
+  private DynamicInputChunk(Path chunkFilePath,
+                            TaskAttemptContext taskAttemptContext)
+                                   throws IOException, InterruptedException {
+    if (!areInvariantsInitialized())
+      initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+    this.chunkFilePath = chunkFilePath;
+    openForRead(taskAttemptContext);
+  }
+
+  private void openForRead(TaskAttemptContext taskAttemptContext)
+          throws IOException, InterruptedException {
+    reader = new SequenceFileRecordReader<K, V>();
+    reader.initialize(new FileSplit(chunkFilePath, 0,
+            DistCpUtils.getFileSize(chunkFilePath, configuration), null),
+            taskAttemptContext);
+  }
+
+  /**
+   * Factory method that
+   * 1. acquires a chunk for the specified map-task attempt
+   * 2. returns a DynamicInputChunk associated with the acquired chunk-file.
+   * @param taskAttemptContext The attempt-context for the map task that's
+   * trying to acquire a chunk.
+   * @return The acquired dynamic-chunk. The chunk-file is renamed to the
+   * attempt-id (from the attempt-context.)
+   * @throws IOException Exception on failure.
+   * @throws InterruptedException Exception on failure.
+   */
+  public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext)
+                                      throws IOException, InterruptedException {
+    if (!areInvariantsInitialized())
+        initializeChunkInvariants(taskAttemptContext.getConfiguration());
+
+    String taskId
+            = taskAttemptContext.getTaskAttemptID().getTaskID().toString();
+    Path acquiredFilePath = new Path(chunkRootPath, taskId);
+
+    if (fs.exists(acquiredFilePath)) {
+      LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath);
+      return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+    }
+
+    for (FileStatus chunkFile : getListOfChunkFiles()) {
+      if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
+        LOG.info(taskId + " acquired " + chunkFile.getPath());
+        return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
+      }
+      else
+        LOG.warn(taskId + " could not acquire " + chunkFile.getPath());
+    }
+
+    return null;
+  }
+
+  /**
+   * Method to be called to relinquish an acquired chunk. All streams open to
+   * the chunk are closed, and the chunk-file is deleted.
+   * @throws IOException Exception thrown on failure to release (i.e. delete)
+   * the chunk file.
+   */
+  public void release() throws IOException {
+    close();
+    if (!fs.delete(chunkFilePath, false)) {
+      LOG.error("Unable to release chunk at path: " + chunkFilePath);
+      throw new IOException("Unable to release chunk at path: " + chunkFilePath);
+    }
+  }
+
+  static FileStatus [] getListOfChunkFiles() throws IOException {
+    Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
+    FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern);
+    numChunksLeft = chunkFiles.length;
+    return chunkFiles;
+  }
+
+  /**
+   * Getter for the chunk-file's path, on HDFS.
+   * @return The qualified path to the chunk-file.
+   */
+  public Path getPath() {
+    return chunkFilePath;
+  }
+
+  /**
+   * Getter for the record-reader, opened to the chunk-file.
+   * @return Opened Sequence-file reader.
+   */
+  public SequenceFileRecordReader<K,V> getReader() {
+    assert reader != null : "Reader un-initialized!";
+    return reader;
+  }
+
+  /**
+   * Getter for the number of chunk-files left in the chunk-file directory.
+   * Useful to determine how many chunks (and hence, records) are left to be
+   * processed.
+   * @return Before the first scan of the directory, the number returned is -1.
+   * Otherwise, the number of chunk-files seen from the last scan is returned.
+   */
+  public static int getNumChunksLeft() {
+    return numChunksLeft;
+  }
+}



Mime
View raw message