hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [09/50] [abbrv] hadoop git commit: HADOOP-14267. Make DistCpOptions immutable. Contributed by Mingliang Liu
Date Mon, 03 Apr 2017 20:58:03 GMT
HADOOP-14267. Make DistCpOptions immutable. Contributed by Mingliang Liu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/26172a94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/26172a94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/26172a94

Branch: refs/heads/YARN-2915
Commit: 26172a94d6431e70d7fe15d66be9a7e195f79f60
Parents: 73835c7
Author: Mingliang Liu <liuml07@apache.org>
Authored: Thu Jun 23 00:21:49 2016 -0700
Committer: Mingliang Liu <liuml07@apache.org>
Committed: Fri Mar 31 20:04:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/tools/CopyListing.java    |  38 +-
 .../java/org/apache/hadoop/tools/DistCp.java    | 106 +--
 .../org/apache/hadoop/tools/DistCpContext.java  | 198 ++++
 .../apache/hadoop/tools/DistCpOptionSwitch.java |   2 +-
 .../org/apache/hadoop/tools/DistCpOptions.java  | 925 +++++++++----------
 .../org/apache/hadoop/tools/DistCpSync.java     |  42 +-
 .../hadoop/tools/FileBasedCopyListing.java      |  12 +-
 .../apache/hadoop/tools/GlobbedCopyListing.java |  17 +-
 .../org/apache/hadoop/tools/OptionsParser.java  | 299 ++----
 .../apache/hadoop/tools/SimpleCopyListing.java  | 115 +--
 .../hadoop/tools/mapred/CopyCommitter.java      |  15 +-
 .../apache/hadoop/tools/util/DistCpUtils.java   |   8 +-
 .../apache/hadoop/tools/TestCopyListing.java    |  51 +-
 .../apache/hadoop/tools/TestDistCpOptions.java  | 500 ++++++++++
 .../org/apache/hadoop/tools/TestDistCpSync.java |  68 +-
 .../hadoop/tools/TestDistCpSyncReverseBase.java |  44 +-
 .../apache/hadoop/tools/TestDistCpViewFs.java   |  10 +-
 .../hadoop/tools/TestFileBasedCopyListing.java  |   9 +-
 .../hadoop/tools/TestGlobbedCopyListing.java    |  11 +-
 .../apache/hadoop/tools/TestIntegration.java    |  20 +-
 .../apache/hadoop/tools/TestOptionsParser.java  |  81 +-
 .../contract/AbstractContractDistCpTest.java    |   6 +-
 .../hadoop/tools/mapred/TestCopyCommitter.java  |  34 +-
 .../mapred/TestUniformSizeInputFormat.java      |  15 +-
 .../mapred/lib/TestDynamicInputFormat.java      |  17 +-
 25 files changed, 1574 insertions(+), 1069 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index 9ebf9d2..908b558 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -77,41 +77,41 @@ public abstract class CopyListing extends Configured {
    * TARGET IS DIR        : Key-"/file1", Value-FileStatus(/tmp/file1)  
    *
    * @param pathToListFile - Output file where the listing would be stored
-   * @param options - Input options to distcp
+   * @param distCpContext - distcp context associated with input options
    * @throws IOException - Exception if any
    */
   public final void buildListing(Path pathToListFile,
-                                 DistCpOptions options) throws IOException {
-    validatePaths(options);
-    doBuildListing(pathToListFile, options);
+      DistCpContext distCpContext) throws IOException {
+    validatePaths(distCpContext);
+    doBuildListing(pathToListFile, distCpContext);
     Configuration config = getConf();
 
     config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
     config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
     config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
 
-    validateFinalListing(pathToListFile, options);
+    validateFinalListing(pathToListFile, distCpContext);
     LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths());
   }
 
   /**
    * Validate input and output paths
    *
-   * @param options - Input options
+   * @param distCpContext - Distcp context
    * @throws InvalidInputException If inputs are invalid
    * @throws IOException any Exception with FS
    */
-  protected abstract void validatePaths(DistCpOptions options)
+  protected abstract void validatePaths(DistCpContext distCpContext)
       throws IOException, InvalidInputException;
 
   /**
    * The interface to be implemented by sub-classes, to create the source/target file listing.
    * @param pathToListFile Path on HDFS where the listing file is written.
-   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @param distCpContext - Distcp context
    * @throws IOException Thrown on failure to create the listing file.
    */
   protected abstract void doBuildListing(Path pathToListFile,
-                                         DistCpOptions options) throws IOException;
+      DistCpContext distCpContext) throws IOException;
 
   /**
    * Return the total bytes that distCp should copy for the source paths
@@ -135,17 +135,17 @@ public abstract class CopyListing extends Configured {
    * If preserving XAttrs, checks that file system can support XAttrs.
    *
    * @param pathToListFile - path listing build by doBuildListing
-   * @param options - Input options to distcp
+   * @param context - Distcp context with associated input options
    * @throws IOException - Any issues while checking for duplicates and throws
    * @throws DuplicateFileException - if there are duplicates
    */
-  private void validateFinalListing(Path pathToListFile, DistCpOptions options)
+  private void validateFinalListing(Path pathToListFile, DistCpContext context)
       throws DuplicateFileException, IOException {
 
     Configuration config = getConf();
     FileSystem fs = pathToListFile.getFileSystem(config);
 
-    final boolean splitLargeFile = options.splitLargeFile();
+    final boolean splitLargeFile = context.splitLargeFile();
 
     // When splitLargeFile is enabled, we don't randomize the copylist
     // earlier, so we don't do the sorting here. For a file that has
@@ -188,7 +188,7 @@ public abstract class CopyListing extends Configured {
           }
         }
         reader.getCurrentValue(lastFileStatus);
-        if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
+        if (context.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
           FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
           URI lastFsUri = lastFs.getUri();
           if (!aclSupportCheckFsSet.contains(lastFsUri)) {
@@ -196,7 +196,7 @@ public abstract class CopyListing extends Configured {
             aclSupportCheckFsSet.add(lastFsUri);
           }
         }
-        if (options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
+        if (context.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
           FileSystem lastFs = lastFileStatus.getPath().getFileSystem(config);
           URI lastFsUri = lastFs.getUri();
           if (!xAttrSupportCheckFsSet.contains(lastFsUri)) {
@@ -210,7 +210,7 @@ public abstract class CopyListing extends Configured {
           lastChunkOffset = lastFileStatus.getChunkOffset();
           lastChunkLength = lastFileStatus.getChunkLength();
         }
-        if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
+        if (context.shouldUseDiff() && LOG.isDebugEnabled()) {
           LOG.debug("Copy list entry " + idx + ": " +
                   lastFileStatus.getPath().toUri().getPath());
           idx++;
@@ -253,14 +253,12 @@ public abstract class CopyListing extends Configured {
    * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
    * @param configuration The input configuration.
    * @param credentials Credentials object on which the FS delegation tokens are cached
-   * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+   * @param context Distcp context with associated input options
    * @return An instance of the appropriate CopyListing implementation.
    * @throws java.io.IOException - Exception if any
    */
   public static CopyListing getCopyListing(Configuration configuration,
-                                           Credentials credentials,
-                                           DistCpOptions options)
-      throws IOException {
+      Credentials credentials, DistCpContext context) throws IOException {
     String copyListingClassName = configuration.get(DistCpConstants.
         CONF_LABEL_COPY_LISTING_CLASS, "");
     Class<? extends CopyListing> copyListingClass;
@@ -270,7 +268,7 @@ public abstract class CopyListing extends Configured {
             CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
             CopyListing.class);
       } else {
-        if (options.getSourceFileListing() == null) {
+        if (context.getSourceFileListing() == null) {
             copyListingClass = GlobbedCopyListing.class;
         } else {
             copyListingClass = FileBasedCopyListing.class;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
index 8c2fa24..df9c328 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.tools;
 import java.io.IOException;
 import java.util.Random;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 import org.apache.hadoop.tools.CopyListing.*;
 import org.apache.hadoop.tools.mapred.CopyMapper;
 import org.apache.hadoop.tools.mapred.CopyOutputFormat;
@@ -66,7 +66,9 @@ public class DistCp extends Configured implements Tool {
 
   static final Log LOG = LogFactory.getLog(DistCp.class);
 
-  private DistCpOptions inputOptions;
+  @VisibleForTesting
+  DistCpContext context;
+
   private Path metaFolder;
 
   private static final String PREFIX = "_distcp";
@@ -79,15 +81,14 @@ public class DistCp extends Configured implements Tool {
   private FileSystem jobFS;
 
   private void prepareFileListing(Job job) throws Exception {
-    if (inputOptions.shouldUseSnapshotDiff()) {
+    if (context.shouldUseSnapshotDiff()) {
       // When "-diff" or "-rdiff" is passed, do sync() first, then
       // create copyListing based on snapshot diff.
-      DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
+      DistCpSync distCpSync = new DistCpSync(context, getConf());
       if (distCpSync.sync()) {
         createInputFileListingWithDiff(job, distCpSync);
       } else {
-        throw new Exception("DistCp sync failed, input options: "
-            + inputOptions);
+        throw new Exception("DistCp sync failed, input options: " + context);
       }
     } else {
       // When no "-diff" or "-rdiff" is passed, create copyListing
@@ -99,16 +100,19 @@ public class DistCp extends Configured implements Tool {
   /**
    * Public Constructor. Creates DistCp object with specified input-parameters.
    * (E.g. source-paths, target-location, etc.)
-   * @param inputOptions Options (indicating source-paths, target-location.)
-   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+   * @param configuration configuration against which the Copy-mapper must run
+   * @param inputOptions Immutable options
    * @throws Exception
    */
-  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+  public DistCp(Configuration configuration, DistCpOptions inputOptions)
+      throws Exception {
     Configuration config = new Configuration(configuration);
     config.addResource(DISTCP_DEFAULT_XML);
     config.addResource(DISTCP_SITE_XML);
     setConf(config);
-    this.inputOptions = inputOptions;
+    if (inputOptions != null) {
+      this.context = new DistCpContext(inputOptions);
+    }
     this.metaFolder   = createMetaFolderPath();
   }
 
@@ -134,10 +138,10 @@ public class DistCp extends Configured implements Tool {
     }
     
     try {
-      inputOptions = (OptionsParser.parse(argv));
-      setOptionsForSplitLargeFile();
+      context = new DistCpContext(OptionsParser.parse(argv));
+      checkSplitLargeFile();
       setTargetPathExists();
-      LOG.info("Input Options: " + inputOptions);
+      LOG.info("Input Options: " + context);
     } catch (Throwable e) {
       LOG.error("Invalid arguments: ", e);
       System.err.println("Invalid arguments: " + e.getMessage());
@@ -173,9 +177,11 @@ public class DistCp extends Configured implements Tool {
    * @throws Exception
    */
   public Job execute() throws Exception {
+    Preconditions.checkState(context != null,
+        "The DistCpContext should have been created before running DistCp!");
     Job job = createAndSubmitJob();
 
-    if (inputOptions.shouldBlock()) {
+    if (context.shouldBlock()) {
       waitForJobCompletion(job);
     }
     return job;
@@ -186,7 +192,7 @@ public class DistCp extends Configured implements Tool {
    * @return The mapreduce job object that has been submitted
    */
   public Job createAndSubmitJob() throws Exception {
-    assert inputOptions != null;
+    assert context != null;
     assert getConf() != null;
     Job job = null;
     try {
@@ -230,53 +236,36 @@ public class DistCp extends Configured implements Tool {
    * for the benefit of CopyCommitter
    */
   private void setTargetPathExists() throws IOException {
-    Path target = inputOptions.getTargetPath();
+    Path target = context.getTargetPath();
     FileSystem targetFS = target.getFileSystem(getConf());
     boolean targetExists = targetFS.exists(target);
-    inputOptions.setTargetPathExists(targetExists);
+    context.setTargetPathExists(targetExists);
     getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, 
         targetExists);
   }
 
   /**
-   * Check if concat is supported by fs.
-   * Throws UnsupportedOperationException if not.
+   * Check splitting large files is supported and populate configs.
    */
-  private void checkConcatSupport(FileSystem fs) {
+  private void checkSplitLargeFile() throws IOException {
+    if (!context.splitLargeFile()) {
+      return;
+    }
+
+    final Path target = context.getTargetPath();
+    final FileSystem targetFS = target.getFileSystem(getConf());
     try {
       Path[] src = null;
       Path tgt = null;
-      fs.concat(tgt, src);
+      targetFS.concat(tgt, src);
     } catch (UnsupportedOperationException use) {
       throw new UnsupportedOperationException(
           DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() +
-          " is not supported since the target file system doesn't" +
-          " support concat.", use);
+              " is not supported since the target file system doesn't" +
+              " support concat.", use);
     } catch (Exception e) {
       // Ignore other exception
     }
-  }
-
-  /**
-   * Set up needed options for splitting large files.
-   */
-  private void setOptionsForSplitLargeFile() throws IOException {
-    if (!inputOptions.splitLargeFile()) {
-      return;
-    }
-    Path target = inputOptions.getTargetPath();
-    FileSystem targetFS = target.getFileSystem(getConf());
-    checkConcatSupport(targetFS);
-
-    LOG.info("Enabling preserving blocksize since "
-        + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
-    inputOptions.preserve(FileAttribute.BLOCKSIZE);
-
-    LOG.info("Set " +
-        DistCpOptionSwitch.APPEND.getSwitch()
-        + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
-        + " is passed.");
-    inputOptions.setAppend(false);
 
     LOG.info("Set " +
         DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES
@@ -286,7 +275,6 @@ public class DistCp extends Configured implements Tool {
         DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
   }
 
-
   /**
    * Create Job object for submitting it, with all the configuration
    *
@@ -300,7 +288,7 @@ public class DistCp extends Configured implements Tool {
       jobName += ": " + userChosenName;
     Job job = Job.getInstance(getConf());
     job.setJobName(jobName);
-    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context));
     job.setJarByClass(CopyMapper.class);
     configureOutputFormat(job);
 
@@ -311,9 +299,9 @@ public class DistCp extends Configured implements Tool {
     job.setOutputFormatClass(CopyOutputFormat.class);
     job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
     job.getConfiguration().set(JobContext.NUM_MAPS,
-                  String.valueOf(inputOptions.getMaxMaps()));
+                  String.valueOf(context.getMaxMaps()));
 
-    inputOptions.appendToConf(job.getConfiguration());
+    context.appendToConf(job.getConfiguration());
     return job;
   }
 
@@ -325,18 +313,20 @@ public class DistCp extends Configured implements Tool {
    */
   private void configureOutputFormat(Job job) throws IOException {
     final Configuration configuration = job.getConfiguration();
-    Path targetPath = inputOptions.getTargetPath();
+    Path targetPath = context.getTargetPath();
     FileSystem targetFS = targetPath.getFileSystem(configuration);
     targetPath = targetPath.makeQualified(targetFS.getUri(),
                                           targetFS.getWorkingDirectory());
-    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.ACL)) {
+    if (context.shouldPreserve(
+        DistCpOptions.FileAttribute.ACL)) {
       DistCpUtils.checkFileSystemAclSupport(targetFS);
     }
-    if (inputOptions.shouldPreserve(DistCpOptions.FileAttribute.XATTR)) {
+    if (context.shouldPreserve(
+        DistCpOptions.FileAttribute.XATTR)) {
       DistCpUtils.checkFileSystemXAttrSupport(targetFS);
     }
-    if (inputOptions.shouldAtomicCommit()) {
-      Path workDir = inputOptions.getAtomicWorkPath();
+    if (context.shouldAtomicCommit()) {
+      Path workDir = context.getAtomicWorkPath();
       if (workDir == null) {
         workDir = targetPath.getParent();
       }
@@ -353,7 +343,7 @@ public class DistCp extends Configured implements Tool {
     }
     CopyOutputFormat.setCommitDirectory(job, targetPath);
 
-    Path logPath = inputOptions.getLogPath();
+    Path logPath = context.getLogPath();
     if (logPath == null) {
       logPath = new Path(metaFolder, "_logs");
     } else {
@@ -374,8 +364,8 @@ public class DistCp extends Configured implements Tool {
   protected Path createInputFileListing(Job job) throws IOException {
     Path fileListingPath = getFileListingPath();
     CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
-        job.getCredentials(), inputOptions);
-    copyListing.buildListing(fileListingPath, inputOptions);
+        job.getCredentials(), context);
+    copyListing.buildListing(fileListingPath, context);
     return fileListingPath;
   }
 
@@ -391,7 +381,7 @@ public class DistCp extends Configured implements Tool {
     Path fileListingPath = getFileListingPath();
     CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
         job.getCredentials(), distCpSync);
-    copyListing.buildListing(fileListingPath, inputOptions);
+    copyListing.buildListing(fileListingPath, context);
     return fileListingPath;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
new file mode 100644
index 0000000..c34005e
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the context of the distcp at runtime.
+ *
+ * It has the immutable {@link DistCpOptions} and mutable runtime status.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DistCpContext {
+  private final DistCpOptions options;
+
+  /** The source paths can be set at runtime via snapshots. */
+  private List<Path> sourcePaths;
+
+  /** This is a derived field, it's initialized in the beginning of distcp. */
+  private boolean targetPathExists = true;
+
+  /** Indicate that raw.* xattrs should be preserved if true. */
+  private boolean preserveRawXattrs = false;
+
+  public DistCpContext(DistCpOptions options) {
+    this.options = options;
+    this.sourcePaths = options.getSourcePaths();
+  }
+
+  public void setSourcePaths(List<Path> sourcePaths) {
+    this.sourcePaths = sourcePaths;
+  }
+
+  /**
+   * @return the sourcePaths. Please note this method does not directly delegate
+   * to the {@link #options}.
+   */
+  public List<Path> getSourcePaths() {
+    return sourcePaths;
+  }
+
+  public Path getSourceFileListing() {
+    return options.getSourceFileListing();
+  }
+
+  public Path getTargetPath() {
+    return options.getTargetPath();
+  }
+
+  public boolean shouldAtomicCommit() {
+    return options.shouldAtomicCommit();
+  }
+
+  public boolean shouldSyncFolder() {
+    return options.shouldSyncFolder();
+  }
+
+  public boolean shouldDeleteMissing() {
+    return options.shouldDeleteMissing();
+  }
+
+  public boolean shouldIgnoreFailures() {
+    return options.shouldIgnoreFailures();
+  }
+
+  public boolean shouldOverwrite() {
+    return options.shouldOverwrite();
+  }
+
+  public boolean shouldAppend() {
+    return options.shouldAppend();
+  }
+
+  public boolean shouldSkipCRC() {
+    return options.shouldSkipCRC();
+  }
+
+  public boolean shouldBlock() {
+    return options.shouldBlock();
+  }
+
+  public boolean shouldUseDiff() {
+    return options.shouldUseDiff();
+  }
+
+  public boolean shouldUseRdiff() {
+    return options.shouldUseRdiff();
+  }
+
+  public boolean shouldUseSnapshotDiff() {
+    return options.shouldUseSnapshotDiff();
+  }
+
+  public String getFromSnapshot() {
+    return options.getFromSnapshot();
+  }
+
+  public String getToSnapshot() {
+    return options.getToSnapshot();
+  }
+
+  public final String getFiltersFile() {
+    return options.getFiltersFile();
+  }
+
+  public int getNumListstatusThreads() {
+    return options.getNumListstatusThreads();
+  }
+
+  public int getMaxMaps() {
+    return options.getMaxMaps();
+  }
+
+  public float getMapBandwidth() {
+    return options.getMapBandwidth();
+  }
+
+  public Set<FileAttribute> getPreserveAttributes() {
+    return options.getPreserveAttributes();
+  }
+
+  public boolean shouldPreserve(FileAttribute attribute) {
+    return options.shouldPreserve(attribute);
+  }
+
+  public boolean shouldPreserveRawXattrs() {
+    return preserveRawXattrs;
+  }
+
+  public void setPreserveRawXattrs(boolean preserveRawXattrs) {
+    this.preserveRawXattrs = preserveRawXattrs;
+  }
+
+  public Path getAtomicWorkPath() {
+    return options.getAtomicWorkPath();
+  }
+
+  public Path getLogPath() {
+    return options.getLogPath();
+  }
+
+  public String getCopyStrategy() {
+    return options.getCopyStrategy();
+  }
+
+  public int getBlocksPerChunk() {
+    return options.getBlocksPerChunk();
+  }
+
+  public final boolean splitLargeFile() {
+    return options.getBlocksPerChunk() > 0;
+  }
+
+  public void setTargetPathExists(boolean targetPathExists) {
+    this.targetPathExists = targetPathExists;
+  }
+
+  public boolean isTargetPathExists() {
+    return targetPathExists;
+  }
+
+  public void appendToConf(Configuration conf) {
+    options.appendToConf(conf);
+  }
+
+  @Override
+  public String toString() {
+    return options.toString() +
+        ", sourcePaths=" + sourcePaths +
+        ", targetPathExists=" + targetPathExists +
+        ", preserveRawXattrs" + preserveRawXattrs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index ced9b54..81abb7d 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -81,7 +81,7 @@ public enum DistCpOptionSwitch {
   NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
       new Option("numListstatusThreads", true, "Number of threads to " +
           "use for building file listing (max " +
-          DistCpOptions.maxNumListstatusThreads + ").")),
+          DistCpOptions.MAX_NUM_LISTSTATUS_THREADS + ").")),
   /**
    * Max number of maps to use during copy. DistCp will split work
    * as equally as possible among these maps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index 9822d83..97ae0c4 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -18,43 +18,88 @@
 
 package org.apache.hadoop.tools;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.util.DistCpUtils;
 
+import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * The Options class encapsulates all DistCp options.
- * These may be set from command-line (via the OptionsParser)
- * or may be set manually.
+ *
+ * When you add a new option, please:
+ *  - Add the field along with javadoc in DistCpOptions and its Builder
+ *  - Add setter method in the {@link Builder} class
+ *
+ * This class is immutable.
  */
-public class DistCpOptions {
-
-  private boolean atomicCommit = false;
-  private boolean syncFolder = false;
-  private boolean deleteMissing = false;
-  private boolean ignoreFailures = false;
-  private boolean overwrite = false;
-  private boolean append = false;
-  private boolean skipCRC = false;
-  private boolean blocking = true;
+public final class DistCpOptions {
+  private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
+  public static final int MAX_NUM_LISTSTATUS_THREADS = 40;
+
+  /** File path (hdfs:// or file://) that contains the list of actual files to
+   * copy.
+   */
+  private final Path sourceFileListing;
+
+  /** List of source-paths (including wildcards) to be copied to target. */
+  private final List<Path> sourcePaths;
+
+  /** Destination path for the dist-copy. */
+  private final Path targetPath;
+
+  /** Whether data need to be committed automatically. */
+  private final boolean atomicCommit;
+
+  /** the work path for atomic commit. If null, the work
+   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath). */
+  private final Path atomicWorkPath;
+
+  /** Whether source and target folder contents be sync'ed up. */
+  private final boolean syncFolder;
+
+  /** Whether files only present in target should be deleted. */
+  private boolean deleteMissing;
+
+  /** Whether failures during copy be ignored. */
+  private final boolean ignoreFailures;
+
+  /** Whether files should always be overwritten on target. */
+  private final boolean overwrite;
+
+  /** Whether we want to append new data to target files. This is valid only
+   * with update option and CRC is not skipped. */
+  private final boolean append;
+
+  /** Whether checksum comparison should be skipped while determining if source
+   * and destination files are identical. */
+  private final boolean skipCRC;
+
+  /** Whether to run blocking or non-blocking. */
+  private final boolean blocking;
+
   // When "-diff s1 s2 src tgt" is passed, apply forward snapshot diff (from s1
   // to s2) of source cluster to the target cluster to sync target cluster with
   // the source cluster. Referred to as "Fdiff" in the code.
   // It's required that s2 is newer than s1.
-  private boolean useDiff = false;
+  private final boolean useDiff;
 
   // When "-rdiff s2 s1 src tgt" is passed, apply reversed snapshot diff (from
   // s2 to s1) of target cluster to the target cluster, so to make target
   // cluster go back to s1. Referred to as "Rdiff" in the code.
   // It's required that s2 is newer than s1, and src and tgt have exact same
   // content at their s1, if src is not the same as tgt.
-  private boolean useRdiff = false;
+  private final boolean useRdiff;
 
   // For both -diff and -rdiff, given the example command line switches, two
   // steps are taken:
@@ -66,44 +111,53 @@ public class DistCpOptions {
   //          could be the tgt itself (HDFS-9820).
   //
 
-  public static final int maxNumListstatusThreads = 40;
-  private int numListstatusThreads = 0;  // Indicates that flag is not set.
-  private int maxMaps = DistCpConstants.DEFAULT_MAPS;
-  private float mapBandwidth = 0;  // Indicates that we should use the default.
-
-  private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+  private final String fromSnapshot;
+  private final String toSnapshot;
 
-  private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+  /** The path to a file containing a list of paths to filter out of copy. */
+  private final String filtersFile;
 
-  private boolean preserveRawXattrs;
+  /** Path where output logs are stored. If not specified, it will use the
+   * default value JobStagingDir/_logs and delete upon job completion. */
+  private final Path logPath;
 
-  private Path atomicWorkPath;
+  /** Set the copy strategy to use. Should map to a strategy implementation
+   * in distp-default.xml. */
+  private final String copyStrategy;
 
-  private Path logPath;
+  /** per map bandwidth in MB. */
+  private final float mapBandwidth;
 
-  private Path sourceFileListing;
-  private List<Path> sourcePaths;
+  /** The number of threads to use for listStatus. We allow max
+   * {@link #MAX_NUM_LISTSTATUS_THREADS} threads. Setting numThreads to zero
+   * signify we should use the value from conf properties. */
+  private final int numListstatusThreads;
 
-  private String fromSnapshot;
-  private String toSnapshot;
+  /** The max number of maps to use for copy. */
+  private final int maxMaps;
 
-  private Path targetPath;
-
-  /**
-   * The path to a file containing a list of paths to filter out of the copy.
-   */
-  private String filtersFile;
-
-  // targetPathExist is a derived field, it's initialized in the
-  // beginning of distcp.
-  private boolean targetPathExists = true;
+  /** File attributes that need to be preserved. */
+  private final EnumSet<FileAttribute> preserveStatus;
 
   // Size of chunk in number of blocks when splitting large file into chunks
   // to copy in parallel. Default is 0 and file are not splitted.
-  private int blocksPerChunk = 0;
+  private final int blocksPerChunk;
 
-  public static enum FileAttribute{
-    REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES;
+  /**
+   * File attributes for preserve.
+   *
+   * Each enum entry uses the first char as its symbol.
+   */
+  public enum FileAttribute {
+    REPLICATION,    // R
+    BLOCKSIZE,      // B
+    USER,           // U
+    GROUP,          // G
+    PERMISSION,     // P
+    CHECKSUMTYPE,   // C
+    ACL,            // A
+    XATTR,          // X
+    TIMES;          // T
 
     public static FileAttribute getAttribute(char symbol) {
       for (FileAttribute attribute : values()) {
@@ -115,187 +169,86 @@ public class DistCpOptions {
     }
   }
 
-  /**
-   * Constructor, to initialize source/target paths.
-   * @param sourcePaths List of source-paths (including wildcards)
-   *                     to be copied to target.
-   * @param targetPath Destination path for the dist-copy.
-   */
-  public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
-    assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
-    assert targetPath != null : "Invalid Target path";
+  private DistCpOptions(Builder builder) {
+    this.sourceFileListing = builder.sourceFileListing;
+    this.sourcePaths = builder.sourcePaths;
+    this.targetPath = builder.targetPath;
+
+    this.atomicCommit = builder.atomicCommit;
+    this.atomicWorkPath = builder.atomicWorkPath;
+    this.syncFolder = builder.syncFolder;
+    this.deleteMissing = builder.deleteMissing;
+    this.ignoreFailures = builder.ignoreFailures;
+    this.overwrite = builder.overwrite;
+    this.append = builder.append;
+    this.skipCRC = builder.skipCRC;
+    this.blocking = builder.blocking;
+
+    this.useDiff = builder.useDiff;
+    this.useRdiff = builder.useRdiff;
+    this.fromSnapshot = builder.fromSnapshot;
+    this.toSnapshot = builder.toSnapshot;
 
-    this.sourcePaths = sourcePaths;
-    this.targetPath = targetPath;
+    this.filtersFile = builder.filtersFile;
+    this.logPath = builder.logPath;
+    this.copyStrategy = builder.copyStrategy;
+
+    this.mapBandwidth = builder.mapBandwidth;
+    this.numListstatusThreads = builder.numListstatusThreads;
+    this.maxMaps = builder.maxMaps;
+
+    this.preserveStatus = builder.preserveStatus;
+
+    this.blocksPerChunk = builder.blocksPerChunk;
   }
 
-  /**
-   * Constructor, to initialize source/target paths.
-   * @param sourceFileListing File containing list of source paths
-   * @param targetPath Destination path for the dist-copy.
-   */
-  public DistCpOptions(Path sourceFileListing, Path targetPath) {
-    assert sourceFileListing != null : "Invalid source paths";
-    assert targetPath != null : "Invalid Target path";
+  public Path getSourceFileListing() {
+    return sourceFileListing;
+  }
 
-    this.sourceFileListing = sourceFileListing;
-    this.targetPath = targetPath;
+  public List<Path> getSourcePaths() {
+    return sourcePaths == null ?
+        null : Collections.unmodifiableList(sourcePaths);
   }
 
-  /**
-   * Copy constructor.
-   * @param that DistCpOptions being copied from.
-   */
-  public DistCpOptions(DistCpOptions that) {
-    if (this != that && that != null) {
-      this.atomicCommit = that.atomicCommit;
-      this.syncFolder = that.syncFolder;
-      this.deleteMissing = that.deleteMissing;
-      this.ignoreFailures = that.ignoreFailures;
-      this.overwrite = that.overwrite;
-      this.skipCRC = that.skipCRC;
-      this.blocking = that.blocking;
-      this.useDiff = that.useDiff;
-      this.useRdiff = that.useRdiff;
-      this.numListstatusThreads = that.numListstatusThreads;
-      this.maxMaps = that.maxMaps;
-      this.mapBandwidth = that.mapBandwidth;
-      this.copyStrategy = that.copyStrategy;
-      this.preserveStatus = that.preserveStatus;
-      this.preserveRawXattrs = that.preserveRawXattrs;
-      this.atomicWorkPath = that.getAtomicWorkPath();
-      this.logPath = that.getLogPath();
-      this.sourceFileListing = that.getSourceFileListing();
-      this.sourcePaths = that.getSourcePaths();
-      this.targetPath = that.getTargetPath();
-      this.targetPathExists = that.getTargetPathExists();
-      this.filtersFile = that.getFiltersFile();
-      this.blocksPerChunk = that.blocksPerChunk;
-    }
+  public Path getTargetPath() {
+    return targetPath;
   }
 
-  /**
-   * Should the data be committed atomically?
-   *
-   * @return true if data should be committed automically. false otherwise
-   */
   public boolean shouldAtomicCommit() {
     return atomicCommit;
   }
 
-  /**
-   * Set if data need to be committed automatically
-   *
-   * @param atomicCommit - boolean switch
-   */
-  public void setAtomicCommit(boolean atomicCommit) {
-    this.atomicCommit = atomicCommit;
+  public Path getAtomicWorkPath() {
+    return atomicWorkPath;
   }
 
-  /**
-   * Should the data be sync'ed between source and target paths?
-   *
-   * @return true if data should be sync'ed up. false otherwise
-   */
   public boolean shouldSyncFolder() {
     return syncFolder;
   }
 
-  /**
-   * Set if source and target folder contents be sync'ed up
-   *
-   * @param syncFolder - boolean switch
-   */
-  public void setSyncFolder(boolean syncFolder) {
-    this.syncFolder = syncFolder;
-  }
-
-  /**
-   * Should target files missing in source should be deleted?
-   *
-   * @return true if zoombie target files to be removed. false otherwise
-   */
   public boolean shouldDeleteMissing() {
     return deleteMissing;
   }
 
-  /**
-   * Set if files only present in target should be deleted
-   *
-   * @param deleteMissing - boolean switch
-   */
-  public void setDeleteMissing(boolean deleteMissing) {
-    this.deleteMissing = deleteMissing;
-  }
-
-  /**
-   * Should failures be logged and ignored during copy?
-   *
-   * @return true if failures are to be logged and ignored. false otherwise
-   */
   public boolean shouldIgnoreFailures() {
     return ignoreFailures;
   }
 
-  /**
-   * Set if failures during copy be ignored
-   *
-   * @param ignoreFailures - boolean switch
-   */
-  public void setIgnoreFailures(boolean ignoreFailures) {
-    this.ignoreFailures = ignoreFailures;
-  }
-
-  /**
-   * Should DistCp be running in blocking mode
-   *
-   * @return true if should run in blocking, false otherwise
-   */
-  public boolean shouldBlock() {
-    return blocking;
-  }
-
-  /**
-   * Set if Disctp should run blocking or non-blocking
-   *
-   * @param blocking - boolean switch
-   */
-  public void setBlocking(boolean blocking) {
-    this.blocking = blocking;
-  }
-
-  /**
-   * Should files be overwritten always?
-   *
-   * @return true if files in target that may exist before distcp, should always
-   *         be overwritten. false otherwise
-   */
   public boolean shouldOverwrite() {
     return overwrite;
   }
 
-  /**
-   * Set if files should always be overwritten on target
-   *
-   * @param overwrite - boolean switch
-   */
-  public void setOverwrite(boolean overwrite) {
-    this.overwrite = overwrite;
-  }
-
-  /**
-   * @return whether we can append new data to target files
-   */
   public boolean shouldAppend() {
     return append;
   }
 
-  /**
-   * Set if we want to append new data to target files. This is valid only with
-   * update option and CRC is not skipped.
-   */
-  public void setAppend(boolean append) {
-    this.append = append;
+  public boolean shouldSkipCRC() {
+    return skipCRC;
+  }
+
+  public boolean shouldBlock() {
+    return blocking;
   }
 
   public boolean shouldUseDiff() {
@@ -318,104 +271,34 @@ public class DistCpOptions {
     return this.toSnapshot;
   }
 
-  public void setUseDiff(String fromSS, String toSS) {
-    this.useDiff = true;
-    this.fromSnapshot = fromSS;
-    this.toSnapshot = toSS;
-  }
-
-  public void setUseRdiff(String fromSS, String toSS) {
-    this.useRdiff = true;
-    this.fromSnapshot = fromSS;
-    this.toSnapshot = toSS;
+  public String getFiltersFile() {
+    return filtersFile;
   }
 
-  /**
-   * Should CRC/checksum check be skipped while checking files are identical
-   *
-   * @return true if checksum check should be skipped while checking files are
-   *         identical. false otherwise
-   */
-  public boolean shouldSkipCRC() {
-    return skipCRC;
+  public Path getLogPath() {
+    return logPath;
   }
 
-  /**
-   * Set if checksum comparison should be skipped while determining if
-   * source and destination files are identical
-   *
-   * @param skipCRC - boolean switch
-   */
-  public void setSkipCRC(boolean skipCRC) {
-    this.skipCRC = skipCRC;
+  public String getCopyStrategy() {
+    return copyStrategy;
   }
 
-  /** Get the number of threads to use for listStatus
-   *
-   * @return Number of threads to do listStatus
-   */
   public int getNumListstatusThreads() {
     return numListstatusThreads;
   }
 
-  /** Set the number of threads to use for listStatus. We allow max 40
-   *  threads. Setting numThreads to zero signify we should use the value
-   *  from conf properties.
-   *
-   * @param numThreads - Number of threads
-   */
-  public void setNumListstatusThreads(int numThreads) {
-    if (numThreads > maxNumListstatusThreads) {
-      this.numListstatusThreads = maxNumListstatusThreads;
-    } else if (numThreads > 0) {
-      this.numListstatusThreads = numThreads;
-    } else {
-      this.numListstatusThreads = 0;
-    }
-  }
-
-  /** Get the max number of maps to use for this copy
-   *
-   * @return Max number of maps
-   */
   public int getMaxMaps() {
     return maxMaps;
   }
 
-  /**
-   * Set the max number of maps to use for copy
-   *
-   * @param maxMaps - Number of maps
-   */
-  public void setMaxMaps(int maxMaps) {
-    this.maxMaps = Math.max(maxMaps, 1);
-  }
-
-  /** Get the map bandwidth in MB
-   *
-   * @return Bandwidth in MB
-   */
   public float getMapBandwidth() {
     return mapBandwidth;
   }
 
-  /**
-   * Set per map bandwidth
-   *
-   * @param mapBandwidth - per map bandwidth
-   */
-  public void setMapBandwidth(float mapBandwidth) {
-    assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
-    this.mapBandwidth = mapBandwidth;
-  }
-
-  /**
-   * Returns an iterator with the list of file attributes to preserve
-   *
-   * @return iterator of file attributes to preserve
-   */
-  public Iterator<FileAttribute> preserveAttributes() {
-    return preserveStatus.iterator();
+  public Set<FileAttribute> getPreserveAttributes() {
+    return (preserveStatus == null)
+        ? null
+        : Collections.unmodifiableSet(preserveStatus);
   }
 
   /**
@@ -428,230 +311,10 @@ public class DistCpOptions {
     return preserveStatus.contains(attribute);
   }
 
-  /**
-   * Add file attributes that need to be preserved. This method may be
-   * called multiple times to add attributes.
-   *
-   * @param fileAttribute - Attribute to add, one at a time
-   */
-  public void preserve(FileAttribute fileAttribute) {
-    for (FileAttribute attribute : preserveStatus) {
-      if (attribute.equals(fileAttribute)) {
-        return;
-      }
-    }
-    preserveStatus.add(fileAttribute);
-  }
-
-  /**
-   * Return true if raw.* xattrs should be preserved.
-   * @return true if raw.* xattrs should be preserved.
-   */
-  public boolean shouldPreserveRawXattrs() {
-    return preserveRawXattrs;
-  }
-
-  /**
-   * Indicate that raw.* xattrs should be preserved
-   */
-  public void preserveRawXattrs() {
-    preserveRawXattrs = true;
-  }
-
-  /** Get work path for atomic commit. If null, the work
-   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
-   *
-   * @return Atomic work path on the target cluster. Null if not set
-   */
-  public Path getAtomicWorkPath() {
-    return atomicWorkPath;
-  }
-
-  /**
-   * Set the work path for atomic commit
-   *
-   * @param atomicWorkPath - Path on the target cluster
-   */
-  public void setAtomicWorkPath(Path atomicWorkPath) {
-    this.atomicWorkPath = atomicWorkPath;
-  }
-
-  /** Get output directory for writing distcp logs. Otherwise logs
-   * are temporarily written to JobStagingDir/_logs and deleted
-   * upon job completion
-   *
-   * @return Log output path on the cluster where distcp job is run
-   */
-  public Path getLogPath() {
-    return logPath;
-  }
-
-  /**
-   * Set the log path where distcp output logs are stored
-   * Uses JobStagingDir/_logs by default
-   *
-   * @param logPath - Path where logs will be saved
-   */
-  public void setLogPath(Path logPath) {
-    this.logPath = logPath;
-  }
-
-  /**
-   * Get the copy strategy to use. Uses appropriate input format
-   *
-   * @return copy strategy to use
-   */
-  public String getCopyStrategy() {
-    return copyStrategy;
-  }
-
-  /**
-   * Set the copy strategy to use. Should map to a strategy implementation
-   * in distp-default.xml
-   *
-   * @param copyStrategy - copy Strategy to use
-   */
-  public void setCopyStrategy(String copyStrategy) {
-    this.copyStrategy = copyStrategy;
-  }
-
-  /**
-   * File path (hdfs:// or file://) that contains the list of actual
-   * files to copy
-   *
-   * @return - Source listing file path
-   */
-  public Path getSourceFileListing() {
-    return sourceFileListing;
-  }
-
-  /**
-   * Getter for sourcePaths.
-   * @return List of source-paths.
-   */
-  public List<Path> getSourcePaths() {
-    return sourcePaths;
-  }
-
-  /**
-   * Setter for sourcePaths.
-   * @param sourcePaths The new list of source-paths.
-   */
-  public void setSourcePaths(List<Path> sourcePaths) {
-    assert sourcePaths != null && sourcePaths.size() != 0;
-    this.sourcePaths = sourcePaths;
-  }
-
-  /**
-   * Getter for the targetPath.
-   * @return The target-path.
-   */
-  public Path getTargetPath() {
-    return targetPath;
-  }
-
-  /**
-   * Getter for the targetPathExists.
-   * @return The target-path.
-   */
-  public boolean getTargetPathExists() {
-    return targetPathExists;
-  }
-  
-  /**
-   * Set targetPathExists.
-   * @param targetPathExists Whether the target path of distcp exists.
-   */
-  public boolean setTargetPathExists(boolean targetPathExists) {
-    return this.targetPathExists = targetPathExists;
-  }
-
-  /**
-   * File path that contains the list of patterns
-   * for paths to be filtered from the file copy.
-   * @return - Filter  file path.
-   */
-  public final String getFiltersFile() {
-    return filtersFile;
-  }
-
-  /**
-   * Set filtersFile.
-   * @param filtersFilename The path to a list of patterns to exclude from copy.
-   */
-  public final void setFiltersFile(String filtersFilename) {
-    this.filtersFile = filtersFilename;
-  }
-
-  public final void setBlocksPerChunk(int csize) {
-    this.blocksPerChunk = csize;
-  }
-
-  public final int getBlocksPerChunk() {
+  public int getBlocksPerChunk() {
     return blocksPerChunk;
   }
 
-  public final boolean splitLargeFile() {
-    return blocksPerChunk > 0;
-  }
-
-  void validate() {
-    if ((useDiff || useRdiff) && deleteMissing) {
-      // -delete and -diff/-rdiff are mutually exclusive. For backward
-      // compatibility, we ignore the -delete option here, instead of throwing
-      // an IllegalArgumentException. See HDFS-10397 for more discussion.
-      OptionsParser.LOG.warn(
-          "-delete and -diff/-rdiff are mutually exclusive. " +
-          "The -delete option will be ignored.");
-      setDeleteMissing(false);
-    }
-
-    if (syncFolder && atomicCommit) {
-      throw new IllegalArgumentException("Atomic commit can't be used with " +
-          "sync folder or overwrite options");
-    }
-
-    if (deleteMissing && !(overwrite || syncFolder)) {
-      throw new IllegalArgumentException("Delete missing is applicable " +
-          "only with update or overwrite options");
-    }
-
-    if (overwrite && syncFolder) {
-      throw new IllegalArgumentException("Overwrite and update options are " +
-          "mutually exclusive");
-    }
-
-    if (!syncFolder && skipCRC) {
-      throw new IllegalArgumentException("Skip CRC is valid only with update options");
-    }
-
-    if (!syncFolder && append) {
-      throw new IllegalArgumentException(
-          "Append is valid only with update options");
-    }
-    if (skipCRC && append) {
-      throw new IllegalArgumentException(
-          "Append is disallowed when skipping CRC");
-    }
-    if (!syncFolder && (useDiff || useRdiff)) {
-      throw new IllegalArgumentException(
-          "-diff/-rdiff is valid only with -update option");
-    }
-
-    if (useDiff || useRdiff) {
-      if (StringUtils.isBlank(fromSnapshot) ||
-          StringUtils.isBlank(toSnapshot)) {
-        throw new IllegalArgumentException(
-            "Must provide both the starting and ending " +
-            "snapshot names for -diff/-rdiff");
-      }
-    }
-    if (useDiff && useRdiff) {
-      throw new IllegalArgumentException(
-          "-diff and -rdiff are mutually exclusive");
-    }
-  }
-
   /**
    * Add options to configuration. These will be used in the Mapper/committer
    *
@@ -715,20 +378,292 @@ public class DistCpOptions {
         ", mapBandwidth=" + mapBandwidth +
         ", copyStrategy='" + copyStrategy + '\'' +
         ", preserveStatus=" + preserveStatus +
-        ", preserveRawXattrs=" + preserveRawXattrs +
         ", atomicWorkPath=" + atomicWorkPath +
         ", logPath=" + logPath +
         ", sourceFileListing=" + sourceFileListing +
         ", sourcePaths=" + sourcePaths +
         ", targetPath=" + targetPath +
-        ", targetPathExists=" + targetPathExists +
         ", filtersFile='" + filtersFile + '\'' +
         ", blocksPerChunk=" + blocksPerChunk +
         '}';
   }
 
-  @Override
-  protected DistCpOptions clone() throws CloneNotSupportedException {
-    return (DistCpOptions) super.clone();
+  /**
+   * The builder of the {@link DistCpOptions}.
+   *
+   * This is designed to be the only public interface to create a
+   * {@link DistCpOptions} object for users. It follows a simple Builder design
+   * pattern.
+   */
+  public static class Builder {
+    private Path sourceFileListing;
+    private List<Path> sourcePaths;
+    private Path targetPath;
+
+    private boolean atomicCommit = false;
+    private Path atomicWorkPath;
+    private boolean syncFolder = false;
+    private boolean deleteMissing = false;
+    private boolean ignoreFailures = false;
+    private boolean overwrite = false;
+    private boolean append = false;
+    private boolean skipCRC = false;
+    private boolean blocking = true;
+
+    private boolean useDiff = false;
+    private boolean useRdiff = false;
+    private String fromSnapshot;
+    private String toSnapshot;
+
+    private String filtersFile;
+
+    private Path logPath;
+    private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+    private int numListstatusThreads = 0;  // 0 indicates that flag is not set.
+    private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+    private float mapBandwidth = 0; // 0 indicates we should use the default
+
+    private EnumSet<FileAttribute> preserveStatus =
+        EnumSet.noneOf(FileAttribute.class);
+
+    private int blocksPerChunk = 0;
+
+    public Builder(List<Path> sourcePaths, Path targetPath) {
+      Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
+          "Source paths should not be null or empty!");
+      Preconditions.checkArgument(targetPath != null,
+          "Target path should not be null!");
+      this.sourcePaths = sourcePaths;
+      this.targetPath = targetPath;
+    }
+
+    public Builder(Path sourceFileListing, Path targetPath) {
+      Preconditions.checkArgument(sourceFileListing != null,
+          "Source file listing should not be null!");
+      Preconditions.checkArgument(targetPath != null,
+          "Target path should not be null!");
+
+      this.sourceFileListing = sourceFileListing;
+      this.targetPath = targetPath;
+    }
+
+    /**
+     * This is the single entry point for constructing DistCpOptions objects.
+     *
+     * Before a new DistCpOptions object is returned, it will set the dependent
+     * options, validate the option combinations. After constructing, the
+     * DistCpOptions instance is immutable.
+     */
+    public DistCpOptions build() {
+      setOptionsForSplitLargeFile();
+
+      validate();
+
+      return new DistCpOptions(this);
+    }
+
+    /**
+     * Override options for split large files.
+     */
+    private void setOptionsForSplitLargeFile() {
+      if (blocksPerChunk <= 0) {
+        return;
+      }
+
+      LOG.info("Enabling preserving blocksize since "
+          + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed.");
+      preserve(FileAttribute.BLOCKSIZE);
+
+      LOG.info("Set " + DistCpOptionSwitch.APPEND.getSwitch()
+          + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()
+          + " is passed.");
+      this.append = false;
+    }
+
+    private void validate() {
+      if ((useDiff || useRdiff) && deleteMissing) {
+        // -delete and -diff/-rdiff are mutually exclusive.
+        throw new IllegalArgumentException("-delete and -diff/-rdiff are "
+            + "mutually exclusive. The -delete option will be ignored.");
+      }
+
+      if (!atomicCommit && atomicWorkPath != null) {
+        throw new IllegalArgumentException(
+            "-tmp work-path can only be specified along with -atomic");
+      }
+
+      if (syncFolder && atomicCommit) {
+        throw new IllegalArgumentException("Atomic commit can't be used with "
+            + "sync folder or overwrite options");
+      }
+
+      if (deleteMissing && !(overwrite || syncFolder)) {
+        throw new IllegalArgumentException("Delete missing is applicable "
+            + "only with update or overwrite options");
+      }
+
+      if (overwrite && syncFolder) {
+        throw new IllegalArgumentException("Overwrite and update options are "
+            + "mutually exclusive");
+      }
+
+      if (!syncFolder && skipCRC) {
+        throw new IllegalArgumentException(
+            "Skip CRC is valid only with update options");
+      }
+
+      if (!syncFolder && append) {
+        throw new IllegalArgumentException(
+            "Append is valid only with update options");
+      }
+      if (skipCRC && append) {
+        throw new IllegalArgumentException(
+            "Append is disallowed when skipping CRC");
+      }
+      if (!syncFolder && (useDiff || useRdiff)) {
+        throw new IllegalArgumentException(
+            "-diff/-rdiff is valid only with -update option");
+      }
+
+      if (useDiff || useRdiff) {
+        if (StringUtils.isBlank(fromSnapshot) ||
+            StringUtils.isBlank(toSnapshot)) {
+          throw new IllegalArgumentException(
+              "Must provide both the starting and ending " +
+                  "snapshot names for -diff/-rdiff");
+        }
+      }
+      if (useDiff && useRdiff) {
+        throw new IllegalArgumentException(
+            "-diff and -rdiff are mutually exclusive");
+      }
+    }
+
+    @VisibleForTesting
+    Builder withSourcePaths(List<Path> newSourcePaths) {
+      this.sourcePaths = newSourcePaths;
+      return this;
+    }
+
+    public Builder withAtomicCommit(boolean newAtomicCommit) {
+      this.atomicCommit = newAtomicCommit;
+      return this;
+    }
+
+    public Builder withAtomicWorkPath(Path newAtomicWorkPath) {
+      this.atomicWorkPath = newAtomicWorkPath;
+      return this;
+    }
+
+    public Builder withSyncFolder(boolean newSyncFolder) {
+      this.syncFolder = newSyncFolder;
+      return this;
+    }
+
+    public Builder withDeleteMissing(boolean newDeleteMissing) {
+      this.deleteMissing = newDeleteMissing;
+      return this;
+    }
+
+    public Builder withIgnoreFailures(boolean newIgnoreFailures) {
+      this.ignoreFailures = newIgnoreFailures;
+      return this;
+    }
+
+    public Builder withOverwrite(boolean newOverwrite) {
+      this.overwrite = newOverwrite;
+      return this;
+    }
+
+    public Builder withAppend(boolean newAppend) {
+      this.append = newAppend;
+      return this;
+    }
+
+    public Builder withCRC(boolean newSkipCRC) {
+      this.skipCRC = newSkipCRC;
+      return this;
+    }
+
+    public Builder withBlocking(boolean newBlocking) {
+      this.blocking = newBlocking;
+      return this;
+    }
+
+    public Builder withUseDiff(String newFromSnapshot,  String newToSnapshot) {
+      this.useDiff = true;
+      this.fromSnapshot = newFromSnapshot;
+      this.toSnapshot = newToSnapshot;
+      return this;
+    }
+
+    public Builder withUseRdiff(String newFromSnapshot, String newToSnapshot) {
+      this.useRdiff = true;
+      this.fromSnapshot = newFromSnapshot;
+      this.toSnapshot = newToSnapshot;
+      return this;
+    }
+
+    public Builder withFiltersFile(String newFiletersFile) {
+      this.filtersFile = newFiletersFile;
+      return this;
+    }
+
+    public Builder withLogPath(Path newLogPath) {
+      this.logPath = newLogPath;
+      return this;
+    }
+
+    public Builder withCopyStrategy(String newCopyStrategy) {
+      this.copyStrategy = newCopyStrategy;
+      return this;
+    }
+
+    public Builder withMapBandwidth(float newMapBandwidth) {
+      Preconditions.checkArgument(newMapBandwidth > 0,
+          "Bandwidth " + newMapBandwidth + " is invalid (should be > 0)");
+      this.mapBandwidth = newMapBandwidth;
+      return this;
+    }
+
+    public Builder withNumListstatusThreads(int newNumListstatusThreads) {
+      if (newNumListstatusThreads > MAX_NUM_LISTSTATUS_THREADS) {
+        this.numListstatusThreads = MAX_NUM_LISTSTATUS_THREADS;
+      } else if (newNumListstatusThreads > 0) {
+        this.numListstatusThreads = newNumListstatusThreads;
+      } else {
+        this.numListstatusThreads = 0;
+      }
+      return this;
+    }
+
+    public Builder maxMaps(int newMaxMaps) {
+      this.maxMaps = Math.max(newMaxMaps, 1);
+      return this;
+    }
+
+    public Builder preserve(String attributes) {
+      if (attributes == null || attributes.isEmpty()) {
+        preserveStatus = EnumSet.allOf(FileAttribute.class);
+      } else {
+        for (int index = 0; index < attributes.length(); index++) {
+          preserveStatus.add(FileAttribute.
+              getAttribute(attributes.charAt(index)));
+        }
+      }
+      return this;
+    }
+
+    public Builder preserve(FileAttribute attribute) {
+      preserveStatus.add(attribute);
+      return this;
+    }
+
+    public Builder withBlocksPerChunk(int newBlocksPerChunk) {
+      this.blocksPerChunk = newBlocksPerChunk;
+      return this;
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
index bcae96a..a78320b 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java
@@ -48,7 +48,7 @@ import java.util.HashSet;
  * source.s1
  */
 class DistCpSync {
-  private DistCpOptions inputOptions;
+  private DistCpContext context;
   private Configuration conf;
   // diffMap maps snapshot diff op type to a list of diff ops.
   // It's initially created based on the snapshot diff. Then the individual
@@ -58,13 +58,13 @@ class DistCpSync {
   private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
   private DiffInfo[] renameDiffs;
 
-  DistCpSync(DistCpOptions options, Configuration conf) {
-    this.inputOptions = options;
+  DistCpSync(DistCpContext context, Configuration conf) {
+    this.context = context;
     this.conf = conf;
   }
 
   private boolean isRdiff() {
-    return inputOptions.shouldUseRdiff();
+    return context.shouldUseRdiff();
   }
 
   /**
@@ -77,14 +77,14 @@ class DistCpSync {
    *  default distcp if the third condition isn't met.
    */
   private boolean preSyncCheck() throws IOException {
-    List<Path> sourcePaths = inputOptions.getSourcePaths();
+    List<Path> sourcePaths = context.getSourcePaths();
     if (sourcePaths.size() != 1) {
       // we only support one source dir which must be a snapshottable directory
       throw new IllegalArgumentException(sourcePaths.size()
           + " source paths are provided");
     }
     final Path sourceDir = sourcePaths.get(0);
-    final Path targetDir = inputOptions.getTargetPath();
+    final Path targetDir = context.getTargetPath();
 
     final FileSystem srcFs = sourceDir.getFileSystem(conf);
     final FileSystem tgtFs = targetDir.getFileSystem(conf);
@@ -104,13 +104,15 @@ class DistCpSync {
     // make sure targetFS has no change between from and the current states
     if (!checkNoChange(targetFs, targetDir)) {
       // set the source path using the snapshot path
-      inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
-          inputOptions.getToSnapshot())));
+      context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
+          context.getToSnapshot())));
       return false;
     }
 
-    final String from = getSnapshotName(inputOptions.getFromSnapshot());
-    final String to = getSnapshotName(inputOptions.getToSnapshot());
+    final String from = getSnapshotName(
+        context.getFromSnapshot());
+    final String to = getSnapshotName(
+        context.getToSnapshot());
 
     try {
       final FileStatus fromSnapshotStat =
@@ -152,9 +154,9 @@ class DistCpSync {
       return false;
     }
 
-    List<Path> sourcePaths = inputOptions.getSourcePaths();
+    List<Path> sourcePaths = context.getSourcePaths();
     final Path sourceDir = sourcePaths.get(0);
-    final Path targetDir = inputOptions.getTargetPath();
+    final Path targetDir = context.getTargetPath();
     final FileSystem tfs = targetDir.getFileSystem(conf);
     final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
 
@@ -175,8 +177,8 @@ class DistCpSync {
       deleteTargetTmpDir(targetFs, tmpDir);
       // TODO: since we have tmp directory, we can support "undo" with failures
       // set the source path using the snapshot path
-      inputOptions.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
-          inputOptions.getToSnapshot())));
+      context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir,
+          context.getToSnapshot())));
     }
   }
 
@@ -187,13 +189,13 @@ class DistCpSync {
    */
   private boolean getAllDiffs() throws IOException {
     Path ssDir = isRdiff()?
-        inputOptions.getTargetPath() : inputOptions.getSourcePaths().get(0);
+        context.getTargetPath() : context.getSourcePaths().get(0);
 
     try {
       DistributedFileSystem fs =
           (DistributedFileSystem) ssDir.getFileSystem(conf);
-      final String from = getSnapshotName(inputOptions.getFromSnapshot());
-      final String to = getSnapshotName(inputOptions.getToSnapshot());
+      final String from = getSnapshotName(context.getFromSnapshot());
+      final String to = getSnapshotName(context.getToSnapshot());
       SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir,
           from, to);
       this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
@@ -273,19 +275,19 @@ class DistCpSync {
    */
   private boolean checkNoChange(DistributedFileSystem fs, Path path) {
     try {
-      final String from = getSnapshotName(inputOptions.getFromSnapshot());
+      final String from = getSnapshotName(context.getFromSnapshot());
       SnapshotDiffReport targetDiff =
           fs.getSnapshotDiffReport(path, from, "");
       if (!targetDiff.getDiffList().isEmpty()) {
         DistCp.LOG.warn("The target has been modified since snapshot "
-            + inputOptions.getFromSnapshot());
+            + context.getFromSnapshot());
         return false;
       } else {
         return true;
       }
     } catch (IOException e) {
       DistCp.LOG.warn("Failed to compute snapshot diff on " + path
-          + " at snapshot " + inputOptions.getFromSnapshot(), e);
+          + " at snapshot " + context.getFromSnapshot(), e);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
index 2bc343e..c356edd 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/FileBasedCopyListing.java
@@ -52,7 +52,7 @@ public class FileBasedCopyListing extends CopyListing {
 
   /** {@inheritDoc} */
   @Override
-  protected void validatePaths(DistCpOptions options)
+  protected void validatePaths(DistCpContext context)
       throws IOException, InvalidInputException {
   }
 
@@ -60,14 +60,14 @@ public class FileBasedCopyListing extends CopyListing {
    * Implementation of CopyListing::buildListing().
    *   Iterates over all source paths mentioned in the input-file.
    * @param pathToListFile Path on HDFS where the listing file is written.
-   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @param context Distcp context with associated input options.
    * @throws IOException
    */
   @Override
-  public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
-    DistCpOptions newOption = new DistCpOptions(options);
-    newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
-    globbedListing.buildListing(pathToListFile, newOption);
+  public void doBuildListing(Path pathToListFile, DistCpContext context)
+      throws IOException {
+    context.setSourcePaths(fetchFileList(context.getSourceFileListing()));
+    globbedListing.buildListing(pathToListFile, context);
   }
 
   private List<Path> fetchFileList(Path sourceListing) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
index 27330b7..63c6f43 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/GlobbedCopyListing.java
@@ -51,7 +51,7 @@ public class GlobbedCopyListing extends CopyListing {
 
   /** {@inheritDoc} */
   @Override
-  protected void validatePaths(DistCpOptions options)
+  protected void validatePaths(DistCpContext context)
       throws IOException, InvalidInputException {
   }
 
@@ -60,19 +60,19 @@ public class GlobbedCopyListing extends CopyListing {
    * Creates the copy listing by "globbing" all source-paths.
    * @param pathToListingFile The location at which the copy-listing file
    *                           is to be created.
-   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @param context The distcp context with associated input options.
    * @throws IOException
    */
   @Override
-  public void doBuildListing(Path pathToListingFile,
-                             DistCpOptions options) throws IOException {
+  public void doBuildListing(Path pathToListingFile, DistCpContext context)
+      throws IOException {
 
     List<Path> globbedPaths = new ArrayList<Path>();
-    if (options.getSourcePaths().isEmpty()) {
+    if (context.getSourcePaths().isEmpty()) {
       throw new InvalidInputException("Nothing to process. Source paths::EMPTY");  
     }
 
-    for (Path p : options.getSourcePaths()) {
+    for (Path p : context.getSourcePaths()) {
       FileSystem fs = p.getFileSystem(getConf());
       FileStatus[] inputs = fs.globStatus(p);
 
@@ -85,9 +85,8 @@ public class GlobbedCopyListing extends CopyListing {
       }
     }
 
-    DistCpOptions optionsGlobbed = new DistCpOptions(options);
-    optionsGlobbed.setSourcePaths(globbedPaths);
-    simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+    context.setSourcePaths(globbedPaths);
+    simpleListing.buildListing(pathToListingFile, context);
   }
 
   /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/26172a94/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 8881264..21ff0f8 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
 
 import com.google.common.base.Preconditions;
 
@@ -95,253 +94,126 @@ public class OptionsParser {
         Arrays.toString(args), e);
     }
 
-    DistCpOptions option = parseSourceAndTargetPaths(command);
-
-    option.setIgnoreFailures(
-        command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()));
-
-    option.setAtomicCommit(
-        command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch()));
-
-    option.setSyncFolder(
-        command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()));
-
-    option.setOverwrite(
-        command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch()));
-
-    option.setAppend(
-        command.hasOption(DistCpOptionSwitch.APPEND.getSwitch()));
-
-    option.setDeleteMissing(
-        command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()));
-
-    option.setSkipCRC(
-        command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch()));
-
-    if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch()) &&
-        option.shouldAtomicCommit()) {
-      String workPath = getVal(command, DistCpOptionSwitch.WORK_PATH.getSwitch());
-      if (workPath != null && !workPath.isEmpty()) {
-        option.setAtomicWorkPath(new Path(workPath));
-      }
-    } else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
-      throw new IllegalArgumentException("-tmp work-path can only be specified along with -atomic");
-    }
-
-    if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
-      option.setLogPath(new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
-    }
-
-
-    if (command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) {
-      option.setBlocking(false);
-    }
-
-    parseBandwidth(command, option);
-
-    parseNumListStatusThreads(command, option);
-
-    parseMaxMaps(command, option);
-
-    if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
-      option.setCopyStrategy(
-            getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
-    }
-
-    parsePreserveStatus(command, option);
+    DistCpOptions.Builder builder = parseSourceAndTargetPaths(command);
+    builder
+        .withAtomicCommit(
+            command.hasOption(DistCpOptionSwitch.ATOMIC_COMMIT.getSwitch()))
+        .withSyncFolder(
+            command.hasOption(DistCpOptionSwitch.SYNC_FOLDERS.getSwitch()))
+        .withDeleteMissing(
+            command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch()))
+        .withIgnoreFailures(
+            command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch()))
+        .withOverwrite(
+            command.hasOption(DistCpOptionSwitch.OVERWRITE.getSwitch()))
+        .withAppend(
+            command.hasOption(DistCpOptionSwitch.APPEND.getSwitch()))
+        .withCRC(
+            command.hasOption(DistCpOptionSwitch.SKIP_CRC.getSwitch()))
+        .withBlocking(
+            !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch()));
 
     if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
       String[] snapshots = getVals(command,
           DistCpOptionSwitch.DIFF.getSwitch());
       checkSnapshotsArgs(snapshots);
-      option.setUseDiff(snapshots[0], snapshots[1]);
+      builder.withUseDiff(snapshots[0], snapshots[1]);
     }
     if (command.hasOption(DistCpOptionSwitch.RDIFF.getSwitch())) {
       String[] snapshots = getVals(command,
           DistCpOptionSwitch.RDIFF.getSwitch());
       checkSnapshotsArgs(snapshots);
-      option.setUseRdiff(snapshots[0], snapshots[1]);
+      builder.withUseRdiff(snapshots[0], snapshots[1]);
     }
 
-    parseFileLimit(command);
-
-    parseSizeLimit(command);
-
     if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) {
-      option.setFiltersFile(getVal(command,
-          DistCpOptionSwitch.FILTERS.getSwitch()));
+      builder.withFiltersFile(
+          getVal(command, DistCpOptionSwitch.FILTERS.getSwitch()));
     }
 
-    parseBlocksPerChunk(command, option);
-
-    option.validate();
-
-    return option;
-  }
-
-
-  /**
-   * A helper method to parse chunk size in number of blocks.
-   * Used when breaking large file into chunks to copy in parallel.
-   *
-   * @param command command line arguments
-   */
-  private static void parseBlocksPerChunk(CommandLine command,
-      DistCpOptions option) {
-    boolean hasOption =
-        command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch());
-    LOG.info("parseChunkSize: " +
-        DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption);
-    if (hasOption) {
-      String chunkSizeString = getVal(command,
-          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
-      try {
-        int csize = Integer.parseInt(chunkSizeString);
-        if (csize < 0) {
-          csize = 0;
-        }
-        LOG.info("Set distcp blocksPerChunk to " + csize);
-        option.setBlocksPerChunk(csize);
-      }
-      catch (NumberFormatException e) {
-        throw new IllegalArgumentException("blocksPerChunk is invalid: "
-            + chunkSizeString, e);
-      }
+    if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
+      builder.withLogPath(
+          new Path(getVal(command, DistCpOptionSwitch.LOG_PATH.getSwitch())));
     }
-  }
 
-  /**
-   * parseSizeLimit is a helper method for parsing the deprecated
-   * argument SIZE_LIMIT.
-   *
-   * @param command command line arguments
-   */
-  private static void parseSizeLimit(CommandLine command) {
-    if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
-      String sizeLimitString = getVal(command,
-                              DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
-      try {
-        Long.parseLong(sizeLimitString);
-      }
-      catch (NumberFormatException e) {
-        throw new IllegalArgumentException("Size-limit is invalid: "
-                                            + sizeLimitString, e);
+    if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
+      final String workPath = getVal(command,
+          DistCpOptionSwitch.WORK_PATH.getSwitch());
+      if (workPath != null && !workPath.isEmpty()) {
+        builder.withAtomicWorkPath(new Path(workPath));
       }
-      LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
-              " option. Ignoring.");
     }
-  }
 
-  /**
-   * parseFileLimit is a helper method for parsing the deprecated
-   * argument FILE_LIMIT.
-   *
-   * @param command command line arguments
-   */
-  private static void parseFileLimit(CommandLine command) {
-    if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
-      String fileLimitString = getVal(command,
-                              DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
+    if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
       try {
-        Integer.parseInt(fileLimitString);
+        final Float mapBandwidth = Float.parseFloat(
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()));
+        builder.withMapBandwidth(mapBandwidth);
       } catch (NumberFormatException e) {
-        throw new IllegalArgumentException("File-limit is invalid: "
-                                            + fileLimitString, e);
+        throw new IllegalArgumentException("Bandwidth specified is invalid: " +
+            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
       }
-      LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
-          " option. Ignoring.");
     }
-  }
 
-  /**
-   * parsePreserveStatus is a helper method for parsing PRESERVE_STATUS.
-   *
-   * @param command command line arguments
-   * @param option  parsed distcp options
-   */
-  private static void parsePreserveStatus(CommandLine command,
-                                          DistCpOptions option) {
-    if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
-      String attributes =
-          getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
-      if (attributes == null || attributes.isEmpty()) {
-        for (FileAttribute attribute : FileAttribute.values()) {
-          option.preserve(attribute);
-        }
-      } else {
-        for (int index = 0; index < attributes.length(); index++) {
-          option.preserve(FileAttribute.
-              getAttribute(attributes.charAt(index)));
-        }
+    if (command.hasOption(
+        DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
+      try {
+        final Integer numThreads = Integer.parseInt(getVal(command,
+            DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()));
+        builder.withNumListstatusThreads(numThreads);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Number of liststatus threads is invalid: " + getVal(command,
+                DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
       }
     }
-  }
 
-  /**
-   * parseMaxMaps is a helper method for parsing MAX_MAPS.
-   *
-   * @param command command line arguments
-   * @param option  parsed distcp options
-   */
-  private static void parseMaxMaps(CommandLine command,
-                                   DistCpOptions option) {
     if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
       try {
-        Integer maps = Integer.parseInt(
-            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
-        option.setMaxMaps(maps);
+        final Integer maps = Integer.parseInt(
+            getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()));
+        builder.maxMaps(maps);
       } catch (NumberFormatException e) {
         throw new IllegalArgumentException("Number of maps is invalid: " +
             getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
       }
     }
-  }
 
-  /**
-   * parseNumListStatusThreads is a helper method for parsing
-   * NUM_LISTSTATUS_THREADS.
-   *
-   * @param command command line arguments
-   * @param option  parsed distcp options
-   */
-  private static void parseNumListStatusThreads(CommandLine command,
-                                                DistCpOptions option) {
-    if (command.hasOption(
-        DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
-      try {
-        Integer numThreads = Integer.parseInt(getVal(command,
-              DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
-        option.setNumListstatusThreads(numThreads);
-      } catch (NumberFormatException e) {
-        throw new IllegalArgumentException(
-            "Number of liststatus threads is invalid: " + getVal(command,
-                DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
-      }
+    if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
+      builder.withCopyStrategy(
+            getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
     }
-  }
 
-  /**
-   * parseBandwidth is a helper method for parsing BANDWIDTH.
-   *
-   * @param command command line arguments
-   * @param option  parsed distcp options
-   */
-  private static void parseBandwidth(CommandLine command,
-                                     DistCpOptions option) {
-    if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
+    if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
+      builder.preserve(
+          getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()));
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
+      LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
+          " option. Ignoring.");
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
+      LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
+          " option. Ignoring.");
+    }
+
+    if (command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch())) {
+      final String chunkSizeStr = getVal(command,
+          DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim());
       try {
-        Float mapBandwidth = Float.parseFloat(
-            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
-        if (mapBandwidth <= 0) {
-          throw new IllegalArgumentException("Bandwidth specified is not " +
-              "positive: " + mapBandwidth);
-        }
-        option.setMapBandwidth(mapBandwidth);
+        int csize = Integer.parseInt(chunkSizeStr);
+        csize = csize > 0 ? csize : 0;
+        LOG.info("Set distcp blocksPerChunk to " + csize);
+        builder.withBlocksPerChunk(csize);
       } catch (NumberFormatException e) {
-        throw new IllegalArgumentException("Bandwidth specified is invalid: " +
-            getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
+        throw new IllegalArgumentException("blocksPerChunk is invalid: "
+            + chunkSizeStr, e);
       }
     }
+
+    return builder.build();
   }
 
   /**
@@ -351,9 +223,8 @@ public class OptionsParser {
    * @param command command line arguments
    * @return        DistCpOptions
    */
-  private static DistCpOptions parseSourceAndTargetPaths(
+  private static DistCpOptions.Builder parseSourceAndTargetPaths(
       CommandLine command) {
-    DistCpOptions option;
     Path targetPath;
     List<Path> sourcePaths = new ArrayList<Path>();
 
@@ -378,20 +249,22 @@ public class OptionsParser {
         throw new IllegalArgumentException("Both source file listing and " +
             "source paths present");
       }
-      option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
-          SOURCE_FILE_LISTING.getSwitch())), targetPath);
+      return new DistCpOptions.Builder(new Path(getVal(command,
+          DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())), targetPath);
     } else {
       if (sourcePaths.isEmpty()) {
         throw new IllegalArgumentException("Neither source file listing nor " +
             "source paths present");
       }
-      option = new DistCpOptions(sourcePaths, targetPath);
+      return new DistCpOptions.Builder(sourcePaths, targetPath);
     }
-    return option;
   }
 
   private static String getVal(CommandLine command, String swtch) {
-    String optionValue = command.getOptionValue(swtch);
+    if (swtch == null) {
+      return null;
+    }
+    String optionValue = command.getOptionValue(swtch.trim());
     if (optionValue == null) {
       return null;
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message