hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1495297 [43/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Date Fri, 21 Jun 2013 06:37:39 GMT
Modified: hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestHiRamJobWithBlackListTT.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestHiRamJobWithBlackListTT.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestHiRamJobWithBlackListTT.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestHiRamJobWithBlackListTT.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred;
 
 import java.io.File;

Modified: hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
 package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.Log;
@@ -256,3 +257,278 @@ public class TestLostTaskTracker {
     file.close();
   }
 }
+=======
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTClient; 
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Hashtable;
+
+public class TestLostTaskTracker {
+  private static final Log LOG = LogFactory
+      .getLog(TestLostTaskTracker.class);
+  private static MRCluster cluster;
+  private static Configuration conf = new Configuration();
+  private static Path inputDir = new Path("input");
+  private static Path outputDir = new Path("output");
+  private static String confFile = "mapred-site.xml";
+  private JTProtocol wovenClient = null;
+  private JobID jID = null;
+  private JobInfo jInfo = null;
+  private JTClient jtClient = null;
+
+  @BeforeClass
+  public static void before() throws Exception {
+    String [] expExcludeList = {"java.net.ConnectException",
+        "java.io.IOException"};
+    cluster = MRCluster.createCluster(conf);
+    cluster.setExcludeExpList(expExcludeList);
+    cluster.setUp();
+    Hashtable<String,Object> prop = new Hashtable<String,Object>();
+    prop.put("mapred.tasktracker.expiry.interval",30000L);
+    prop.put("mapreduce.job.complete.cancel.delegation.tokens",false);
+    cluster.restartClusterWithNewConfig(prop, confFile);
+    UtilsForTests.waitFor(1000);
+    conf = cluster.getJTClient().getProxy().getDaemonConf();
+    createInput(inputDir, conf);
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    cluster.tearDown();
+    cluster.restart();
+  }
+  /**
+   * Verify the job status whether it is succeed or not when 
+   * lost task tracker is alive before the timeout.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Test
+  public void testJobStatusOfLostTaskTracker1() throws
+      Exception{
+    String testName = "LTT1";
+    setupJobAndRun();
+    JobStatus jStatus = verifyLostTaskTrackerJobStatus(testName);    
+    Assert.assertEquals("Job has not been succeeded...", 
+         JobStatus.SUCCEEDED, jStatus.getRunState());
+  }
+  
+  /**
+   * Verify the job status whether it is succeeded or not when 
+   * the lost task trackers time out for all four attempts of a task. 
+   * @throws IOException if an I/O error occurs.
+   */
+  @Test
+  public void testJobStatusOfLostTracker2()  throws 
+      Exception {
+    String testName = "LTT2";
+    setupJobAndRun();
+    JobStatus jStatus = verifyLostTaskTrackerJobStatus(testName);
+    Assert.assertEquals("Job has not been failed...", 
+            JobStatus.SUCCEEDED, jStatus.getRunState());
+  }
+
+  private void setupJobAndRun() throws IOException { 
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(3, 1, 60000, 100, 60000, 100);
+    JobConf jobConf = new JobConf(conf);
+    cleanup(outputDir, conf);
+    jtClient = cluster.getJTClient();
+    JobClient client = jtClient.getClient();
+    wovenClient = cluster.getJTClient().getProxy();
+    RunningJob runJob = client.submitJob(jobConf);
+    jID = runJob.getID();
+    jInfo = wovenClient.getJobInfo(jID);
+    Assert.assertNotNull("Job information is null",jInfo);
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jID));
+    JobStatus jobStatus = jInfo.getStatus();
+    // Make sure that job should run and completes 40%. 
+    while (jobStatus.getRunState() != JobStatus.RUNNING && 
+      jobStatus.mapProgress() < 0.4f) {
+      UtilsForTests.waitFor(100);
+      jobStatus = wovenClient.getJobInfo(jID).getStatus();
+    }
+  }
+  
+  private JobStatus verifyLostTaskTrackerJobStatus(String testName) 
+      throws IOException{
+    TaskInfo taskInfo = null;
+    TaskID tID = null;
+    String[] taskTrackers = null;
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jID);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+    Assert.assertTrue("Task has not been started for 1 min.",
+            jtClient.isTaskStarted(taskInfo));
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    TTClient ttClient = getTTClientIns(taskInfo);
+    int counter = 0;
+    while (counter < 30) {
+      if (ttClient != null) {
+        break;
+      }else{
+         taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());  
+         ttClient = getTTClientIns(taskInfo); 
+      }
+      counter ++;
+    }
+    Assert.assertNotNull("TaskTracker has not been found",ttClient);
+    if (testName.equals("LTT1")) {
+        ttClient.kill();
+        waitForTTStop(ttClient);
+        UtilsForTests.waitFor(20000);
+        ttClient.start();
+        waitForTTStart(ttClient);
+    } else {
+       int index = 0 ;
+       while(index++ < 4 ) {
+           ttClient.kill();
+           waitForTTStop(ttClient);
+           UtilsForTests.waitFor(40000);
+           ttClient.start();
+           waitForTTStart(ttClient);
+           taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+           ttClient = getTTClientIns(taskInfo);
+           counter = 0;
+           while (counter < 30) {
+             if (ttClient != null) {
+               break;
+             }else{
+                taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());  
+                ttClient = getTTClientIns(taskInfo); 
+             }
+             counter ++;
+           }
+           Assert.assertNotNull("TaskTracker has not been found",ttClient);
+           LOG.info("Task killed attempts:" + 
+               taskInfo.numKilledAttempts());
+       }
+       Assert.assertEquals("Task killed attempts are not matched ",
+           4, taskInfo.numKilledAttempts());
+    }
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(1000);
+      jInfo = wovenClient.getJobInfo(jID);
+    }
+    return jInfo.getStatus();
+  }
+
+  private TTClient getTTClientIns(TaskInfo taskInfo) throws IOException{
+    String [] taskTrackers = taskInfo.getTaskTrackers();
+    int counter = 0;
+    TTClient ttClient = null;
+    while (counter < 60) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+      taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter ++;
+    }
+    if ( taskTrackers.length != 0) {
+      String hostName = taskTrackers[0].split("_")[1];
+      hostName = hostName.split(":")[0];
+      ttClient = cluster.getTTClient(hostName);
+    }
+    return ttClient;
+  }
+  private void waitForTTStart(TTClient ttClient) throws 
+     IOException {
+    LOG.debug(ttClient.getHostName() + " is waiting to come up.");
+    while (true) { 
+      try {
+        ttClient.ping();
+        LOG.info("TaskTracker : " + ttClient.getHostName() + " is pinging...");
+        break;
+      } catch (Exception exp) {
+        LOG.debug(ttClient.getHostName() + " is waiting to come up.");
+        UtilsForTests.waitFor(10000);
+      }
+    }
+  }
+  
+  private void waitForTTStop(TTClient ttClient) throws 
+     IOException {
+    LOG.info("Waiting for Tasktracker:" + ttClient.getHostName() 
+        + " to stop.....");
+    while (true) {
+      try {
+        ttClient.ping();
+        LOG.debug(ttClient.getHostName() +" is waiting state to stop.");
+        UtilsForTests.waitFor(10000);
+      } catch (Exception exp) {
+        LOG.info("TaskTracker : " + ttClient.getHostName() + " is stopped...");
+        break;
+      } 
+    }
+  }
+  
+  private static void cleanup(Path dir, Configuration conf) throws 
+      IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+
+  private static void createInput(Path inDir, Configuration conf) throws 
+      IOException {
+    String input = "Hadoop is framework for data intensive distributed " 
+        + "applications.\nHadoop enables applications to" 
+        + " work with thousands of nodes.";
+    FileSystem fs = inDir.getFileSystem(conf);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Failed to create the input directory:" 
+          + inDir.toString());
+    }
+    fs.setPermission(inDir, new FsPermission(FsAction.ALL, 
+        FsAction.ALL, FsAction.ALL));
+    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+    file.writeBytes(input);
+    file.close();
+  }
+}
+>>>>>>> branch-1

Modified: hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.Log;

Modified: hadoop/common/branches/branch-1-win/src/test/test-patch.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/test-patch.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/test-patch.properties (original)
+++ hadoop/common/branches/branch-1-win/src/test/test-patch.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 OK_RELEASEAUDIT_WARNINGS=0
 OK_FINDBUGS_WARNINGS=211
 OK_JAVADOC_WARNINGS=9

Modified: hadoop/common/branches/branch-1-win/src/test/testjar/UserNamePermission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/testjar/UserNamePermission.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/testjar/UserNamePermission.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/testjar/UserNamePermission.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package testjar;
 
 import java.io.IOException;

Added: hadoop/common/branches/branch-1-win/src/tools/distcp-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/distcp-default.xml?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/distcp-default.xml (added)
+++ hadoop/common/branches/branch-1-win/src/tools/distcp-default.xml Fri Jun 21 06:37:27 2013
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Do not modify this file directly. Anything that need to be overwritten,
+     need to be done so, through -D switches or customized conf -->
+
+<configuration>
+
+    <property>
+        <name>distcp.dynamic.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
+        <description>Implementation of dynamic input format</description>
+    </property>
+
+    <property>
+        <name>distcp.static.strategy.impl</name>
+        <value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
+        <description>Implementation of static input format</description>
+    </property>
+
+    <property>
+        <name>mapred.job.map.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.job.reduce.memory.mb</name>
+        <value>1024</value>
+    </property>
+
+    <property>
+        <name>mapred.reducer.new-api</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.class</name>
+        <value>org.apache.hadoop.mapreduce.Reducer</value>
+    </property>
+
+</configuration>

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp.java Fri Jun 21 06:37:27 2013
@@ -979,6 +979,15 @@ public class DistCp implements Tool {
 
     jobconf.setMapperClass(CopyFilesMapper.class);
     jobconf.setNumReduceTasks(0);
+    // Propagate delegation related props to DistCp job
+    String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+    if (tokenFile != null) {
+      LOG
+        .info("Setting env property for mapreduce.job.credentials.binary to: "
+          + tokenFile);
+      jobconf.set("mapreduce.job.credentials.binary", tokenFile);
+    }
+
     return jobconf;
   }
 

Modified: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for distcp counters
 
 CounterGroupName=       distcp

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/CopyListing.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+/**
+ * The CopyListing abstraction is responsible for how the list of
+ * sources and targets is constructed, for DistCp's copy function.
+ * The copy-listing should be a SequenceFile<Text, FileStatus>,
+ * located at the path specified to buildListing(),
+ * each entry being a pair of (Source relative path, source file status),
+ * all the paths being fully qualified.
+ */
+public abstract class CopyListing extends Configured {
+
+  private Credentials credentials;
+
+  /**
+   * Build listing function creates the input listing that distcp uses to
+   * perform the copy.
+   *
+   * The build listing is a sequence file that has relative path of a file in the key
+   * and the file status information of the source file in the value
+   *
+   * For instance if the source path is /tmp/data and the traversed path is
+   * /tmp/data/dir1/dir2/file1, then the sequence file would contain
+   *
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * File would also contain directory entries. Meaning, if /tmp/data/dir1/dir2/file1
+   * is the only file under /tmp/data, the resulting sequence file would contain the
+   * following entries
+   *
+   * key: /dir1 and value: FileStatus(/tmp/data/dir1)
+   * key: /dir1/dir2 and value: FileStatus(/tmp/data/dir1/dir2)
+   * key: /dir1/dir2/file1 and value: FileStatus(/tmp/data/dir1/dir2/file1)
+   *
+   * Cases requiring special handling:
+   * If source path is a file (/tmp/file1), contents of the file will be as follows
+   *
+   * TARGET DOES NOT EXIST: Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS FILE       : Key-"", Value-FileStatus(/tmp/file1)
+   * TARGET IS DIR        : Key-"/file1", Value-FileStatus(/tmp/file1)  
+   *
+   * @param pathToListFile - Output file where the listing would be stored
+   * @param options - Input options to distcp
+   * @throws IOException - Exception if any
+   */
+  public final void buildListing(Path pathToListFile,
+                                 DistCpOptions options) throws IOException {
+    validatePaths(options);
+    doBuildListing(pathToListFile, options);
+    Configuration config = getConf();
+
+    config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, pathToListFile.toString());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, getBytesToCopy());
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
+
+    checkForDuplicates(pathToListFile);
+  }
+
+  /**
+   * Validate input and output paths
+   *
+   * @param options - Input options
+   * @throws InvalidInputException: If inputs are invalid
+   * @throws IOException: any Exception with FS 
+   */
+  protected abstract void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException;
+
+  /**
+   * The interface to be implemented by sub-classes, to create the source/target file listing.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException: Thrown on failure to create the listing file.
+   */
+  protected abstract void doBuildListing(Path pathToListFile,
+                                         DistCpOptions options) throws IOException;
+
+  /**
+   * Return the total bytes that distCp should copy for the source paths
+   * This doesn't consider whether file is same should be skipped during copy
+   *
+   * @return total bytes to copy
+   */
+  protected abstract long getBytesToCopy();
+
+  /**
+   * Return the total number of paths to distcp, includes directories as well
+   * This doesn't consider whether file/dir is already present and should be skipped during copy
+   *
+   * @return Total number of paths to distcp
+   */
+  protected abstract long getNumberOfPaths();
+
+  /**
+   * Validate the final resulting path listing to see if there are any duplicate entries
+   *
+   * @param pathToListFile - path listing build by doBuildListing
+   * @throws IOException - Any issues while checking for duplicates and throws
+   * @throws DuplicateFileException - if there are duplicates
+   */
+  private void checkForDuplicates(Path pathToListFile)
+      throws DuplicateFileException, IOException {
+
+    Configuration config = getConf();
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile);
+
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, sortedList, config);
+    try {
+      Text lastKey = new Text("*"); //source relative path can never hold *
+      FileStatus lastFileStatus = new FileStatus();
+
+      Text currentKey = new Text();
+      while (reader.next(currentKey)) {
+        if (currentKey.equals(lastKey)) {
+          FileStatus currentFileStatus = new FileStatus();
+          reader.getCurrentValue(currentFileStatus);
+          throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " +
+              currentFileStatus.getPath() + " would cause duplicates. Aborting");
+        }
+        reader.getCurrentValue(lastFileStatus);
+        lastKey.set(currentKey);
+      }
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+
+  /**
+   * Protected constructor, to initialize configuration.
+   * @param configuration The input configuration,
+   *                        with which the source/target FileSystems may be accessed.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached.If null
+   * delegation token caching is skipped
+   */
+  protected CopyListing(Configuration configuration, Credentials credentials) {
+    setConf(configuration);
+    setCredentials(credentials);
+  }
+
+  /**
+   * set Credentials store, on which FS delegatin token will be cached
+   * @param credentials - Credentials object
+   */
+  protected void setCredentials(Credentials credentials) {
+    this.credentials = credentials;
+  }
+
+  /**
+   * get credentials to update the delegation tokens for accessed FS objects
+   * @return Credentials object
+   */
+  protected Credentials getCredentials() {
+    return credentials;
+  }
+
+  /**
+   * Public Factory method with which the appropriate CopyListing implementation may be retrieved.
+   * @param configuration The input configuration.
+   * @param credentials Credentials object on which the FS delegation tokens are cached
+   * @param options The input Options, to help choose the appropriate CopyListing Implementation.
+   * @return An instance of the appropriate CopyListing implementation.
+   * @throws java.io.IOException - Exception if any
+   */
+  public static CopyListing getCopyListing(Configuration configuration,
+                                           Credentials credentials,
+                                           DistCpOptions options)
+      throws IOException {
+
+    String copyListingClassName = configuration.get(DistCpConstants.
+        CONF_LABEL_COPY_LISTING_CLASS, "");
+    Class<? extends CopyListing> copyListingClass;
+    try {
+      if (! copyListingClassName.isEmpty()) {
+        copyListingClass = configuration.getClass(DistCpConstants.
+            CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
+            CopyListing.class);
+      } else {
+        if (options.getSourceFileListing() == null) {
+            copyListingClass = GlobbedCopyListing.class;
+        } else {
+            copyListingClass = FileBasedCopyListing.class;
+        }
+      }
+      copyListingClassName = copyListingClass.getName();
+      Constructor<? extends CopyListing> constructor = copyListingClass.
+          getDeclaredConstructor(Configuration.class, Credentials.class);
+      return constructor.newInstance(configuration, credentials);
+    } catch (Exception e) {
+      throw new IOException("Unable to instantiate " + copyListingClassName, e);
+    }
+  }
+
+  static class DuplicateFileException extends RuntimeException {
+    public DuplicateFileException(String message) {
+      super(message);
+    }
+  }
+
+  static class InvalidInputException extends RuntimeException {
+    public InvalidInputException(String message) {
+      super(message);
+    }
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCp.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.tools.distcp2.CopyListing.DuplicateFileException;
+import org.apache.hadoop.tools.distcp2.CopyListing.InvalidInputException;
+import org.apache.hadoop.tools.distcp2.mapred.CopyMapper;
+import org.apache.hadoop.tools.distcp2.mapred.CopyOutputFormat;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * DistCp is the main driver-class for DistCpV2.
+ * For command-line use, DistCp::main() orchestrates the parsing of command-line
+ * parameters and the launch of the DistCp job.
+ * For programmatic use, a DistCp object can be constructed by specifying
+ * options (in a DistCpOptions object), and DistCp::execute() may be used to
+ * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
+ * behaviour.
+ */
+public class DistCp extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(DistCp.class);
+
+  private DistCpOptions inputOptions;
+  private Path metaFolder;
+
+  private static final String PREFIX = "_distcp";
+  private static final String WIP_PREFIX = "._WIP_";
+  private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
+  public static final Random rand = new Random();
+
+  private boolean submitted;
+  private FileSystem jobFS;
+
+  /**
+   * Public Constructor. Creates DistCp object with specified input-parameters.
+   * (E.g. source-paths, target-location, etc.)
+   * @param inputOptions Options (indicating source-paths, target-location.)
+   * @param configuration The Hadoop configuration against which the Copy-mapper must run.
+   * @throws Exception, on failure.
+   */
+  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
+    final JobConf config = new JobConf(configuration);
+    config.addResource(DISTCP_DEFAULT_XML);
+    setConf(config);
+    this.inputOptions = inputOptions;
+    this.metaFolder   = createMetaFolderPath();
+  }
+
+  /**
+   * To be used with the ToolRunner. Not for public consumption.
+   */
+  private DistCp() {}
+
+  /**
+   * Implementation of Tool::run(). Orchestrates the copy of source file(s)
+   * to target location, by:
+   *  1. Creating a list of files to be copied to target.
+   *  2. Launching a Map-only job to copy the files. (Delegates to execute().)
+   * @param argv List of arguments passed to DistCp, from the ToolRunner.
+   * @return On success, it returns 0. Else, -1.
+   */
+  public int run(String[] argv) {
+    if (argv.length < 1) {
+      OptionsParser.usage();
+      return DistCpConstants.INVALID_ARGUMENT;
+    }
+    
+    try {
+      inputOptions = (OptionsParser.parse(argv));
+
+      LOG.info("Input Options: " + inputOptions);
+    } catch (Throwable e) {
+      LOG.error("Invalid arguments: ", e);
+      System.err.println("Invalid arguments: " + e.getMessage());
+      OptionsParser.usage();      
+      return DistCpConstants.INVALID_ARGUMENT;
+    }
+    
+    try {
+      execute();
+    } catch (InvalidInputException e) {
+      LOG.error("Invalid input: ", e);
+      return DistCpConstants.INVALID_ARGUMENT;
+    } catch (DuplicateFileException e) {
+      LOG.error("Duplicate files in input path: ", e);
+      return DistCpConstants.DUPLICATE_INPUT;
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      return DistCpConstants.UNKNOWN_ERROR;
+    }
+    return DistCpConstants.SUCCESS;
+  }
+
+  /**
+   * Implements the core-execution. Creates the file-list for copy,
+   * and launches the Hadoop-job, to do the copy.
+   * @return Job handle
+   * @throws Exception, on failure.
+   */
+  public Job execute() throws Exception {
+    assert inputOptions != null;
+    assert getConf() != null;
+
+    Job job = null;
+    try {
+      synchronized(this) {
+        metaFolder = createMetaFolderPath();
+        jobFS = metaFolder.getFileSystem(getConf());
+  
+        job = createJob();
+      }
+      createInputFileListing(job);
+
+      job.submit();
+      submitted = true;
+    } finally {
+      if (!submitted) {
+        cleanup();
+      }
+    }
+
+    String jobID = job.getJobID().toString();
+    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
+    
+    LOG.info("DistCp job-id: " + jobID);
+    if (inputOptions.shouldBlock()) {
+      job.waitForCompletion(true);
+    }
+    return job;
+  }
+
+  /**
+   * Create Job object for submitting it, with all the configuration
+   *
+   * @return Reference to job object.
+   * @throws IOException - Exception if any
+   */
+  private Job createJob() throws IOException {
+    String jobName = "distcp";
+    Job job = Job.getInstance(getConf());
+    String userChosenName = job.getJobName();
+    if (userChosenName != null)
+      jobName += ": " + userChosenName;
+    job.setJobName(jobName);
+    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
+    job.setJarByClass(CopyMapper.class);
+    configureOutputFormat(job);
+
+    job.setMapperClass(CopyMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputFormatClass(CopyOutputFormat.class);
+
+    job.setSpeculativeExecution(false);
+    ((JobConf)job.getConfiguration()).setNumMapTasks(inputOptions.getMaxMaps());
+
+    if (inputOptions.getSslConfigurationFile() != null) {
+      setupSSLConfig(job);
+    }
+
+    inputOptions.appendToConf(job.getConfiguration());
+    return job;
+  }
+
+  /**
+   * Setup ssl configuration on the job configuration to enable hsftp access
+   * from map job. Also copy the ssl configuration file to Distributed cache
+   *
+   * @param job - Reference to job's handle
+   * @throws java.io.IOException - Exception if unable to locate ssl config file
+   */
+  private void setupSSLConfig(Job job) throws IOException  {
+    Configuration configuration = job.getConfiguration();
+    Path sslConfigPath = new Path(configuration.
+        getResource(inputOptions.getSslConfigurationFile()).toString());
+
+    addSSLFilesToDistCache(job, sslConfigPath);
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
+  }
+
+  /**
+   * Add SSL files to distributed cache. Trust store, key store and ssl config xml
+   *
+   * @param job - Job handle
+   * @param sslConfigPath - ssl Configuration file specified through options
+   * @throws IOException - If any
+   */
+  private void addSSLFilesToDistCache(Job job,
+                                      Path sslConfigPath) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    FileSystem localFS = FileSystem.getLocal(configuration);
+
+    Configuration sslConf = new Configuration(false);
+    sslConf.addResource(sslConfigPath);
+
+    Path localStorePath = getLocalStorePath(sslConf,
+                            DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
+    addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
+                      localStorePath.getName());
+
+    localStorePath = getLocalStorePath(sslConf,
+                             DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
+    addCacheFile(job, localStorePath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+    configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
+                                      localStorePath.getName());
+
+    addCacheFile(job, sslConfigPath.makeQualified(localFS.getUri(),
+                                      localFS.getWorkingDirectory()).toUri());
+
+  }
+
+  private static void addCacheFile(Job job, URI uri) {
+    DistributedCache.addCacheFile(uri, job.getConfiguration());
+  }
+
+  /**
+   * Get Local Trust store/key store path
+   *
+   * @param sslConf - Config from SSL Client xml
+   * @param storeKey - Key for either trust store or key store
+   * @return - Path where the store is present
+   * @throws IOException -If any
+   */
+  private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
+    if (sslConf.get(storeKey) != null) {
+      return new Path(sslConf.get(storeKey));
+    } else {
+      throw new IOException("Store for " + storeKey + " is not set in " +
+          inputOptions.getSslConfigurationFile());
+    }
+  }
+
+  /**
+   * Setup output format appropriately
+   *
+   * @param job - Job handle
+   * @throws IOException - Exception if any
+   */
+  private void configureOutputFormat(Job job) throws IOException {
+    final Configuration configuration = job.getConfiguration();
+    Path targetPath = inputOptions.getTargetPath();
+    FileSystem targetFS = targetPath.getFileSystem(configuration);
+    targetPath = targetPath.makeQualified(targetFS.getUri(),
+                                          targetFS.getWorkingDirectory());
+
+    if (inputOptions.shouldAtomicCommit()) {
+      Path workDir = inputOptions.getAtomicWorkPath();
+      if (workDir == null) {
+        workDir = targetPath.getParent();
+      }
+      workDir = new Path(workDir, WIP_PREFIX + targetPath.getName()
+                                + rand.nextInt());
+      FileSystem workFS = workDir.getFileSystem(configuration);
+      if (!DistCpUtils.compareFs(targetFS, workFS)) {
+        throw new IllegalArgumentException("Work path " + workDir +
+            " and target path " + targetPath + " are in different file system");
+      }
+      CopyOutputFormat.setWorkingDirectory(job, workDir);
+    } else {
+      CopyOutputFormat.setWorkingDirectory(job, targetPath);
+    }
+    CopyOutputFormat.setCommitDirectory(job, targetPath);
+
+    Path logPath = inputOptions.getLogPath();
+    if (logPath == null) {
+      logPath = new Path(metaFolder, "_logs");
+    } else {
+      LOG.info("DistCp job log path: " + logPath);
+    }
+    CopyOutputFormat.setOutputPath(job, logPath);
+  }
+
+  /**
+   * Create input listing by invoking an appropriate copy listing
+   * implementation. Also add delegation tokens for each path
+   * to job's credential store
+   *
+   * @param job - Handle to job
+   * @return Returns the path where the copy listing is created
+   * @throws IOException - If any
+   */
+  protected Path createInputFileListing(Job job) throws IOException {
+    Path fileListingPath = getFileListingPath();
+    CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(),
+        job.getCredentials(), inputOptions);
+    copyListing.buildListing(fileListingPath, inputOptions);
+    return fileListingPath;
+  }
+
+  /**
+   * Get default name of the copy listing file. Use the meta folder
+   * to create the copy listing file
+   *
+   * @return - Path where the copy listing file has to be saved
+   * @throws IOException - Exception if any
+   */
+  protected Path getFileListingPath() throws IOException {
+    String fileListPathStr = metaFolder + "/fileList.seq";
+    Path path = new Path(fileListPathStr);
+    return new Path(path.toUri().normalize().toString());
+  }
+
+  /**
+   * Create a default working folder for the job, under the
+   * job staging directory
+   *
+   * @return Returns the working folder information
+   * @throws Exception - EXception if any
+   */
+  private Path createMetaFolderPath() throws Exception {
+    final JobConf configuration = (JobConf)getConf();
+    Path stagingDir = JobSubmissionFiles.getStagingDir(
+        new JobClient(configuration), configuration);
+    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
+    if (LOG.isDebugEnabled())
+      LOG.debug("Meta folder location: " + metaFolderPath);
+    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
+    return metaFolderPath;
+  }
+
+  /**
+   * Main function of the DistCp program. Parses the input arguments (via OptionsParser),
+   * and invokes the DistCp::run() method, via the ToolRunner.
+   * @param argv Command-line arguments sent to DistCp.
+   */
+  public static void main(String argv[]) {
+    try {
+      DistCp distCp = new DistCp();
+      Cleanup CLEANUP = new Cleanup(distCp);
+
+      Runtime.getRuntime().addShutdownHook(CLEANUP);
+      System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
+    }
+    catch (Exception e) {
+      LOG.error("Couldn't complete DistCp operation: ", e);
+      System.exit(DistCpConstants.UNKNOWN_ERROR);
+    }
+  }
+
+  /**
+   * Loads properties from distcp-default.xml into configuration
+   * object
+   * @return Configuration which includes properties from distcp-default.xml
+   */
+  private static JobConf getDefaultConf() {
+    JobConf config = new JobConf();
+    config.addResource(DISTCP_DEFAULT_XML);
+    return config;
+  }
+
+  private synchronized void cleanup() {
+    try {
+      if (metaFolder == null) return;
+
+      jobFS.delete(metaFolder, true);
+      metaFolder = null;
+    } catch (IOException e) {
+      LOG.error("Unable to cleanup meta folder: " + metaFolder, e);
+    }
+  }
+
+  private boolean isSubmitted() {
+    return submitted;
+  }
+
+  private static class Cleanup extends Thread {
+    private final DistCp distCp;
+
+    public Cleanup(DistCp distCp) {
+      this.distCp = distCp;
+    }
+
+    @Override
+    public void run() {
+      if (distCp.isSubmitted()) return;
+
+      distCp.cleanup();
+    }
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpConstants.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,107 @@
+package org.apache.hadoop.tools.distcp2;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility class to hold commonly used constants.
+ */
+public class DistCpConstants {
+
+  /* Default number of maps to use for DistCp */
+  public static final int DEFAULT_MAPS = 20;
+
+  /* Default bandwidth if none specified */
+  public static final int DEFAULT_BANDWIDTH_MB = 100;
+
+  /* Default strategy for copying. Implementation looked up
+     from distcp-default.xml
+   */
+  public static final String UNIFORMSIZE = "uniformsize";
+
+  /**
+   *  Constants mapping to command line switches/input options
+   */
+  public static final String CONF_LABEL_ATOMIC_COPY = "distcp.atomic.copy";
+  public static final String CONF_LABEL_WORK_PATH = "distcp.work.path";
+  public static final String CONF_LABEL_LOG_PATH = "distcp.log.path";
+  public static final String CONF_LABEL_IGNORE_FAILURES = "distcp.ignore.failures";
+  public static final String CONF_LABEL_PRESERVE_STATUS = "distcp.preserve.status";
+  public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
+  public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+  public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
+  public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
+  public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
+  public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
+  public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
+  public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
+  public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+
+  /* Total bytes to be copied. Updated by copylisting. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected";
+
+  /* Total number of paths to copy, includes directories. Unfiltered count */
+  public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
+
+  /* SSL keystore resource */
+  public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
+
+  /* If input is based -f <<source listing>>, file containing the src paths */
+  public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
+
+  /* Directory where the mapreduce job will write to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_FINAL_PATH
+   */
+  public static final String CONF_LABEL_TARGET_WORK_PATH = "distcp.target.work.path";
+
+  /* Directory where the final data will be committed to. If not atomic commit, then same
+    as CONF_LABEL_TARGET_WORK_PATH
+   */
+  public static final String CONF_LABEL_TARGET_FINAL_PATH = "distcp.target.final.path";
+
+  /**
+   * DistCp job id for consumers of the Disctp 
+   */
+  public static final String CONF_LABEL_DISTCP_JOB_ID = "distcp.job.id";
+
+  /* Meta folder where the job's intermediate data is kept */
+  public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+
+  /* DistCp CopyListing class override param */
+  public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
+
+  /**
+   * Conf label for SSL Trust-store location.
+   */
+  public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
+      = "ssl.client.truststore.location";
+
+  /**
+   * Conf label for SSL Key-store location.
+   */
+  public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
+      = "ssl.client.keystore.location";
+
+  /**
+   * Constants for DistCp return code to shell / consumer of ToolRunner's run
+   */
+  public static final int SUCCESS = 0;
+  public static final int INVALID_ARGUMENT = -1;
+  public static final int DUPLICATE_INPUT = -2;
+  public static final int UNKNOWN_ERROR = -999;
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptionSwitch.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Enumeration mapping configuration keys to distcp command line
+ * options.
+ */
+public enum DistCpOptionSwitch {
+
+  /**
+   * Ignores any failures during copy, and continues with rest.
+   * Logs failures in a file
+   */
+  IGNORE_FAILURES(DistCpConstants.CONF_LABEL_IGNORE_FAILURES,
+      new Option("i", false, "Ignore failures during copy")),
+
+  /**
+   * Preserves status of file/path in the target.
+   * Default behavior with -p, is to preserve replication,
+   * block size, user, group and permission on the target file
+   *
+   * If any of the optional switches are present among rbugp, then
+   * only the corresponding file attribute is preserved
+   *
+   */
+  PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
+      new Option("p", true, "preserve status (rbugp)" +
+          "(replication, block-size, user, group, permission)")),
+
+  /**
+   * Update target location by copying only files that are missing
+   * in the target. This can be used to periodically sync two folders
+   * across source and target. Typically used with DELETE_MISSING
+   * Incompatible with ATOMIC_COMMIT
+   */
+  SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, 
+      new Option("update", false, "Update target, copying only missing" +
+          "files or directories")),
+
+  /**
+   * Deletes missing files in target that are missing from source
+   * This allows the target to be in sync with the source contents
+   * Typically used in conjunction with SYNC_FOLDERS
+   * Incompatible with ATOMIC_COMMIT
+   */
+  DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
+      new Option("delete", false, "Delete from target, " +
+          "files missing in source")),
+
+  /**
+   * Configuration file to use with hftps:// for securely copying
+   * files across clusters. Typically the configuration file contains
+   * truststore/keystore information such as location, password and type
+   */
+  SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
+      new Option("mapredSslConf", true, "Configuration for ssl config file" +
+          ", to use with hftps://")),
+
+  /**
+   * Max number of maps to use during copy. DistCp will split work
+   * as equally as possible among these maps
+   */
+  MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS, 
+      new Option("m", true, "Max number of concurrent maps to use for copy")),
+
+  /**
+   * Source file listing can be provided to DistCp in a file.
+   * This allows DistCp to copy random list of files from source
+   * and copy them to target
+   */
+  SOURCE_FILE_LISTING(DistCpConstants.CONF_LABEL_SOURCE_LISTING,
+      new Option("f", true, "List of files that need to be copied")),
+
+  /**
+   * Copy all the source files and commit them atomically to the target
+   * This is typically useful in cases where there is a process
+   * polling for availability of a file/dir. This option is incompatible
+   * with SYNC_FOLDERS & DELETE_MISSING
+   */
+  ATOMIC_COMMIT(DistCpConstants.CONF_LABEL_ATOMIC_COPY,
+      new Option("atomic", false, "Commit all changes or none")),
+
+  /**
+   * Work path to be used only in conjunction in Atomic commit
+   */
+  WORK_PATH(DistCpConstants.CONF_LABEL_WORK_PATH,
+      new Option("tmp", true, "Intermediate work path to be used for atomic commit")),
+
+  /**
+   * Log path where distcp output logs are written to
+   */
+  LOG_PATH(DistCpConstants.CONF_LABEL_LOG_PATH,
+      new Option("log", true, "Folder on DFS where distcp execution logs are saved")),
+
+  /**
+   * Copy strategy is use. This could be dynamic or uniform size etc.
+   * DistCp would use an appropriate input format based on this.
+   */
+  COPY_STRATEGY(DistCpConstants.CONF_LABEL_COPY_STRATEGY,
+      new Option("strategy", true, "Copy strategy to use. Default is " +
+          "dividing work based on file sizes")),
+
+  /**
+   * Skip CRC checks between source and target, when determining what
+   * files need to be copied.
+   */
+  SKIP_CRC(DistCpConstants.CONF_LABEL_SKIP_CRC,
+      new Option("skipcrccheck", false, "Whether to skip CRC checks between " +
+          "source and target paths.")),
+
+  /**
+   * Overwrite target-files unconditionally.
+   */
+  OVERWRITE(DistCpConstants.CONF_LABEL_OVERWRITE,
+      new Option("overwrite", false, "Choose to overwrite target files " +
+          "unconditionally, even if they exist.")),
+
+  /**
+   * Should DisctpExecution be blocking
+   */
+  BLOCKING("",
+      new Option("async", false, "Should distcp execution be blocking")),
+
+  FILE_LIMIT("",
+      new Option("filelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n")),
+
+  SIZE_LIMIT("",
+      new Option("sizelimit", true, "(Deprecated!) Limit number of files " +
+              "copied to <= n bytes")),
+
+  /**
+   * Specify bandwidth per map in MB
+   */
+  BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+      new Option("bandwidth", true, "Specify bandwidth per map in MB"));
+
+  private final String confLabel;
+  private final Option option;
+
+  DistCpOptionSwitch(String confLabel, Option option) {
+    this.confLabel = confLabel;
+    this.option = option;
+  }
+
+  /**
+   * Get Configuration label for the option
+   * @return configuration label name
+   */
+  public String getConfigLabel() {
+    return confLabel;
+  }
+
+  /**
+   * Get CLI Option corresponding to the distcp option
+   * @return option
+   */
+  public Option getOption() {
+    return option;
+  }
+
+  /**
+   * Get Switch symbol
+   * @return switch symbol char
+   */
+  public String getSwitch() {
+    return option.getOpt();
+  }
+
+  @Override
+  public String toString() {
+    return  super.name() + " {" +
+        "confLabel='" + confLabel + '\'' +
+        ", option=" + option + '}';
+  }
+
+  /**
+   * Helper function to add an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   * @param value - Value
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option,
+                               String value) {
+    conf.set(option.getConfigLabel(), value);
+  }
+
+  /**
+   * Helper function to set an option to hadoop configuration object
+   * @param conf - Configuration object to include the option
+   * @param option - Option to add
+   */
+  public static void addToConf(Configuration conf,
+                               DistCpOptionSwitch option) {
+    conf.set(option.getConfigLabel(), "true");
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/DistCpOptions.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.util.DistCpUtils;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The Options class encapsulates all DistCp options.
+ * These may be set from command-line (via the OptionsParser)
+ * or may be set manually.
+ */
+public class DistCpOptions {
+
+  private boolean atomicCommit = false;
+  private boolean syncFolder = false;
+  private boolean deleteMissing = false;
+  private boolean ignoreFailures = false;
+  private boolean overwrite = false;
+  private boolean skipCRC = false;
+  private boolean blocking = true;
+
+  private int maxMaps = DistCpConstants.DEFAULT_MAPS;
+  private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+
+  private String sslConfigurationFile;
+
+  private String copyStrategy = DistCpConstants.UNIFORMSIZE;
+
+  private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
+
+  private Path atomicWorkPath;
+
+  private Path logPath;
+
+  private Path sourceFileListing;
+  private List<Path> sourcePaths;
+
+  private Path targetPath;
+
+  public static enum FileAttribute{
+    REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+
+    public static FileAttribute getAttribute(char symbol) {
+      for (FileAttribute attribute : values()) {
+        if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) {
+          return attribute;
+        }
+      }
+      throw new NoSuchElementException("No attribute for " + symbol);
+    }
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourcePaths List of source-paths (including wildcards)
+   *                     to be copied to target.
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(List<Path> sourcePaths, Path targetPath) {
+    assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourcePaths = sourcePaths;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Constructor, to initialize source/target paths.
+   * @param sourceFileListing File containing list of source paths
+   * @param targetPath Destination path for the dist-copy.
+   */
+  public DistCpOptions(Path sourceFileListing, Path targetPath) {
+    assert sourceFileListing != null : "Invalid source paths";
+    assert targetPath != null : "Invalid Target path";
+
+    this.sourceFileListing = sourceFileListing;
+    this.targetPath = targetPath;
+  }
+
+  /**
+   * Copy constructor.
+   * @param that DistCpOptions being copied from.
+   */
+  public DistCpOptions(DistCpOptions that) {
+    if (this != that && that != null) {
+      this.atomicCommit = that.atomicCommit;
+      this.syncFolder = that.syncFolder;
+      this.deleteMissing = that.deleteMissing;
+      this.ignoreFailures = that.ignoreFailures;
+      this.overwrite = that.overwrite;
+      this.skipCRC = that.skipCRC;
+      this.blocking = that.blocking;
+      this.maxMaps = that.maxMaps;
+      this.mapBandwidth = that.mapBandwidth;
+      this.sslConfigurationFile = that.getSslConfigurationFile();
+      this.copyStrategy = that.copyStrategy;
+      this.preserveStatus = that.preserveStatus;
+      this.atomicWorkPath = that.getAtomicWorkPath();
+      this.logPath = that.getLogPath();
+      this.sourceFileListing = that.getSourceFileListing();
+      this.sourcePaths = that.getSourcePaths();
+      this.targetPath = that.getTargetPath();
+    }
+  }
+
+  /**
+   * Should the data be committed atomically?
+   *
+   * @return true if data should be committed automically. false otherwise
+   */
+  public boolean shouldAtomicCommit() {
+    return atomicCommit;
+  }
+
+  /**
+   * Set if data need to be committed automatically
+   *
+   * @param atomicCommit - boolean switch
+   */
+  public void setAtomicCommit(boolean atomicCommit) {
+    validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit);
+    this.atomicCommit = atomicCommit;
+  }
+
+  /**
+   * Should the data be sync'ed between source and target paths?
+   *
+   * @return true if data should be sync'ed up. false otherwise
+   */
+  public boolean shouldSyncFolder() {
+    return syncFolder;
+  }
+
+  /**
+   * Set if source and target folder contents be sync'ed up
+   *
+   * @param syncFolder - boolean switch
+   */
+  public void setSyncFolder(boolean syncFolder) {
+    validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder);
+    this.syncFolder = syncFolder;
+  }
+
+  /**
+   * Should target files missing in source should be deleted?
+   *
+   * @return true if zoombie target files to be removed. false otherwise
+   */
+  public boolean shouldDeleteMissing() {
+    return deleteMissing;
+  }
+
+  /**
+   * Set if files only present in target should be deleted
+   *
+   * @param deleteMissing - boolean switch
+   */
+  public void setDeleteMissing(boolean deleteMissing) {
+    validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing);
+    this.deleteMissing = deleteMissing;
+  }
+
+  /**
+   * Should failures be logged and ignored during copy?
+   *
+   * @return true if failures are to be logged and ignored. false otherwise
+   */
+  public boolean shouldIgnoreFailures() {
+    return ignoreFailures;
+  }
+
+  /**
+   * Set if failures during copy be ignored
+   *
+   * @param ignoreFailures - boolean switch
+   */
+  public void setIgnoreFailures(boolean ignoreFailures) {
+    this.ignoreFailures = ignoreFailures;
+  }
+
+  /**
+   * Should DistCp be running in blocking mode
+   *
+   * @return true if should run in blocking, false otherwise
+   */
+  public boolean shouldBlock() {
+    return blocking;
+  }
+
+  /**
+   * Set if Disctp should run blocking or non-blocking
+   *
+   * @param blocking - boolean switch
+   */
+  public void setBlocking(boolean blocking) {
+    this.blocking = blocking;
+  }
+
+  /**
+   * Should files be overwritten always?
+   *
+   * @return true if files in target that may exist before distcp, should always
+   *         be overwritten. false otherwise
+   */
+  public boolean shouldOverwrite() {
+    return overwrite;
+  }
+
+  /**
+   * Set if files should always be overwritten on target
+   *
+   * @param overwrite - boolean switch
+   */
+  public void setOverwrite(boolean overwrite) {
+    validate(DistCpOptionSwitch.OVERWRITE, overwrite);
+    this.overwrite = overwrite;
+  }
+
+  /**
+   * Should CRC/checksum check be skipped while checking files are identical
+   *
+   * @return true if checksum check should be skipped while checking files are
+   *         identical. false otherwise
+   */
+  public boolean shouldSkipCRC() {
+    return skipCRC;
+  }
+
+  /**
+   * Set if checksum comparison should be skipped while determining if
+   * source and destination files are identical
+   *
+   * @param skipCRC - boolean switch
+   */
+  public void setSkipCRC(boolean skipCRC) {
+    validate(DistCpOptionSwitch.SKIP_CRC, skipCRC);
+    this.skipCRC = skipCRC;
+  }
+
+  /** Get the max number of maps to use for this copy
+   *
+   * @return Max number of maps
+   */
+  public int getMaxMaps() {
+    return maxMaps;
+  }
+
+  /**
+   * Set the max number of maps to use for copy
+   *
+   * @param maxMaps - Number of maps
+   */
+  public void setMaxMaps(int maxMaps) {
+    this.maxMaps = Math.max(maxMaps, 1);
+  }
+
+  /** Get the map bandwidth in MB
+   *
+   * @return Bandwidth in MB
+   */
+  public int getMapBandwidth() {
+    return mapBandwidth;
+  }
+
+  /**
+   * Set per map bandwidth
+   *
+   * @param mapBandwidth - per map bandwidth
+   */
+  public void setMapBandwidth(int mapBandwidth) {
+    assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
+    this.mapBandwidth = mapBandwidth;
+  }
+
+  /**
+   * Get path where the ssl configuration file is present to use for hftps://
+   *
+   * @return Path on local file system
+   */
+  public String getSslConfigurationFile() {
+    return sslConfigurationFile;
+  }
+
+  /**
+   * Set the SSL configuration file path to use with hftps:// (local path)
+   *
+   * @param sslConfigurationFile - Local ssl config file path
+   */
+  public void setSslConfigurationFile(String sslConfigurationFile) {
+    this.sslConfigurationFile = sslConfigurationFile;
+  }
+
+  /**
+   * Returns an iterator with the list of file attributes to preserve
+   *
+   * @return iterator of file attributes to preserve
+   */
+  public Iterator<FileAttribute> preserveAttributes() {
+    return preserveStatus.iterator();
+  }
+
+  /**
+   * Checks if the input attibute should be preserved or not
+   *
+   * @param attribute - Attribute to check
+   * @return True if attribute should be preserved, false otherwise
+   */
+  public boolean shouldPreserve(FileAttribute attribute) {
+    return preserveStatus.contains(attribute);
+  }
+
+  /**
+   * Add file attributes that need to be preserved. This method may be
+   * called multiple times to add attributes.
+   *
+   * @param fileAttribute - Attribute to add, one at a time
+   */
+  public void preserve(FileAttribute fileAttribute) {
+    for (FileAttribute attribute : preserveStatus) {
+      if (attribute.equals(fileAttribute)) {
+        return;
+      }
+    }
+    preserveStatus.add(fileAttribute);
+  }
+
+  /** Get work path for atomic commit. If null, the work
+   * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath)
+   *
+   * @return Atomic work path on the target cluster. Null if not set
+   */
+  public Path getAtomicWorkPath() {
+    return atomicWorkPath;
+  }
+
+  /**
+   * Set the work path for atomic commit
+   *
+   * @param atomicWorkPath - Path on the target cluster
+   */
+  public void setAtomicWorkPath(Path atomicWorkPath) {
+    this.atomicWorkPath = atomicWorkPath;
+  }
+
+  /** Get output directory for writing distcp logs. Otherwise logs
+   * are temporarily written to JobStagingDir/_logs and deleted
+   * upon job completion
+   *
+   * @return Log output path on the cluster where distcp job is run
+   */
+  public Path getLogPath() {
+    return logPath;
+  }
+
+  /**
+   * Set the log path where distcp output logs are stored
+   * Uses JobStagingDir/_logs by default
+   *
+   * @param logPath - Path where logs will be saved
+   */
+  public void setLogPath(Path logPath) {
+    this.logPath = logPath;
+  }
+
+  /**
+   * Get the copy strategy to use. Uses appropriate input format
+   *
+   * @return copy strategy to use
+   */
+  public String getCopyStrategy() {
+    return copyStrategy;
+  }
+
+  /**
+   * Set the copy strategy to use. Should map to a strategy implementation
+   * in distp-default.xml
+   *
+   * @param copyStrategy - copy Strategy to use
+   */
+  public void setCopyStrategy(String copyStrategy) {
+    this.copyStrategy = copyStrategy;
+  }
+
+  /**
+   * File path (hdfs:// or file://) that contains the list of actual
+   * files to copy
+   *
+   * @return - Source listing file path
+   */
+  public Path getSourceFileListing() {
+    return sourceFileListing;
+  }
+
+  /**
+   * Getter for sourcePaths.
+   * @return List of source-paths.
+   */
+  public List<Path> getSourcePaths() {
+    return sourcePaths;
+  }
+
+  /**
+   * Setter for sourcePaths.
+   * @param sourcePaths The new list of source-paths.
+   */
+  public void setSourcePaths(List<Path> sourcePaths) {
+    assert sourcePaths != null && sourcePaths.size() != 0;
+    this.sourcePaths = sourcePaths;
+  }
+
+  /**
+   * Getter for the targetPath.
+   * @return The target-path.
+   */
+  public Path getTargetPath() {
+    return targetPath;
+  }
+
+  public void validate(DistCpOptionSwitch option, boolean value) {
+
+    boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
+        value : this.syncFolder);
+    boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ?
+        value : this.overwrite);
+    boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ?
+        value : this.deleteMissing);
+    boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ?
+        value : this.atomicCommit);
+    boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
+        value : this.skipCRC);
+
+    if (syncFolder && atomicCommit) {
+      throw new IllegalArgumentException("Atomic commit can't be used with " +
+          "sync folder or overwrite options");
+    }
+
+    if (deleteMissing && !(overwrite || syncFolder)) {
+      throw new IllegalArgumentException("Delete missing is applicable " +
+          "only with update or overwrite options");
+    }
+
+    if (overwrite && syncFolder) {
+      throw new IllegalArgumentException("Overwrite and update options are " +
+          "mutually exclusive");
+    }
+
+    if (!syncFolder && skipCRC) {
+      throw new IllegalArgumentException("Skip CRC is valid only with update options");
+    }
+
+  }
+
+  /**
+   * Add options to configuration. These will be used in the Mapper/committer
+   *
+   * @param conf - Configruation object to which the options need to be added
+   */
+  public void appendToConf(Configuration conf) {
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT,
+        String.valueOf(atomicCommit));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES,
+        String.valueOf(ignoreFailures));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS,
+        String.valueOf(syncFolder));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING,
+        String.valueOf(deleteMissing));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
+        String.valueOf(overwrite));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
+        String.valueOf(skipCRC));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
+        String.valueOf(mapBandwidth));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
+        DistCpUtils.packAttributes(preserveStatus));
+  }
+
+  /**
+   * Utility to easily string-ify Options, for logging.
+   *
+   * @return String representation of the Options.
+   */
+  @Override
+  public String toString() {
+    return "DistCpOptions{" +
+        "atomicCommit=" + atomicCommit +
+        ", syncFolder=" + syncFolder +
+        ", deleteMissing=" + deleteMissing +
+        ", ignoreFailures=" + ignoreFailures +
+        ", maxMaps=" + maxMaps +
+        ", sslConfigurationFile='" + sslConfigurationFile + '\'' +
+        ", copyStrategy='" + copyStrategy + '\'' +
+        ", sourceFileListing=" + sourceFileListing +
+        ", sourcePaths=" + sourcePaths +
+        ", targetPath=" + targetPath +
+        '}';
+  }
+
+  @Override
+  protected DistCpOptions clone() throws CloneNotSupportedException {
+    return (DistCpOptions) super.clone();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/FileBasedCopyListing.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * FileBasedCopyListing implements the CopyListing interface,
+ * to create the copy-listing for DistCp,
+ * by iterating over all source paths mentioned in a specified input-file.
+ */
+public class FileBasedCopyListing extends CopyListing {
+
+  private final CopyListing globbedListing;
+  /**
+   * Constructor, to initialize base-class.
+   * @param configuration The input Configuration object.
+   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public FileBasedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    globbedListing = new GlobbedCopyListing(getConf(), credentials);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   *   Iterates over all source paths mentioned in the input-file.
+   * @param pathToListFile Path on HDFS where the listing file is written.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListFile, DistCpOptions options) throws IOException {
+    DistCpOptions newOption = new DistCpOptions(options);
+    newOption.setSourcePaths(fetchFileList(options.getSourceFileListing()));
+    globbedListing.buildListing(pathToListFile, newOption);
+  }
+
+  private List<Path> fetchFileList(Path sourceListing) throws IOException {
+    List<Path> result = new ArrayList<Path>();
+    FileSystem fs = sourceListing.getFileSystem(getConf());
+    BufferedReader input = null;
+    try {
+      input = new BufferedReader(new InputStreamReader(fs.open(sourceListing)));
+      String line = input.readLine();
+      while (line != null) {
+        result.add(new Path(line));
+        line = input.readLine();
+      }
+    } finally {
+      IOUtils.closeStream(input);
+    }
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return globbedListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return globbedListing.getNumberOfPaths();
+  }
+}

Added: hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java (added)
+++ hadoop/common/branches/branch-1-win/src/tools/org/apache/hadoop/tools/distcp2/GlobbedCopyListing.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * GlobbedCopyListing implements the CopyListing interface, to create the copy
+ * listing-file by "globbing" all specified source paths (wild-cards and all.)
+ */
+public class GlobbedCopyListing extends CopyListing {
+  private static final Log LOG = LogFactory.getLog(GlobbedCopyListing.class);
+
+  private final CopyListing simpleListing;
+  /**
+   * Constructor, to initialize the configuration.
+   * @param configuration The input Configuration object.
+   * @param credentials Credentials object on which the FS delegation tokens are cached. If null
+   * delegation token caching is skipped
+   */
+  public GlobbedCopyListing(Configuration configuration, Credentials credentials) {
+    super(configuration, credentials);
+    simpleListing = new SimpleCopyListing(getConf(), credentials) ;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void validatePaths(DistCpOptions options)
+      throws IOException, InvalidInputException {
+  }
+
+  /**
+   * Implementation of CopyListing::buildListing().
+   * Creates the copy listing by "globbing" all source-paths.
+   * @param pathToListingFile The location at which the copy-listing file
+   *                           is to be created.
+   * @param options Input Options for DistCp (indicating source/target paths.)
+   * @throws IOException
+   */
+  @Override
+  public void doBuildListing(Path pathToListingFile,
+                             DistCpOptions options) throws IOException {
+
+    List<Path> globbedPaths = new ArrayList<Path>();
+    if (options.getSourcePaths().isEmpty()) {
+      throw new InvalidInputException("Nothing to process. Source paths::EMPTY");  
+    }
+
+    for (Path p : options.getSourcePaths()) {
+      FileSystem fs = p.getFileSystem(getConf());
+      FileStatus[] inputs = fs.globStatus(p);
+
+      if(inputs != null && inputs.length > 0) {
+        for (FileStatus onePath: inputs) {
+          globbedPaths.add(onePath.getPath());
+        }
+      } else {
+        throw new InvalidInputException(p + " doesn't exist");        
+      }
+    }
+
+    DistCpOptions optionsGlobbed = new DistCpOptions(options);
+    optionsGlobbed.setSourcePaths(globbedPaths);
+    simpleListing.buildListing(pathToListingFile, optionsGlobbed);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getBytesToCopy() {
+    return simpleListing.getBytesToCopy();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected long getNumberOfPaths() {
+    return simpleListing.getNumberOfPaths();
+  }
+
+}



Mime
View raw message