hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1461952 [2/6] - in /hadoop/common/branches/branch-1: ./ bin/ src/docs/src/documentation/content/xdocs/ src/test/ src/test/org/apache/hadoop/tools/distcp2/ src/test/org/apache/hadoop/tools/distcp2/mapred/ src/test/org/apache/hadoop/tools/di...
Date Thu, 28 Mar 2013 05:00:10 GMT
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestIntegration {
+  private static final Log LOG = LogFactory.getLog(TestIntegration.class);
+
+  private static MiniDFSCluster miniDfsCluster = null;
+  private static MiniMRCluster miniMrCluster = null;
+
+  private static FileSystem fs;
+
+  private static Path listFile;
+  private static Path target;
+  private static String root;
+
+  private static JobConf getConf() {
+    return miniMrCluster.createJobConf();
+  }
+
+  @BeforeClass
+  public static void setup() {
+    try {
+      miniDfsCluster = new MiniDFSCluster(new Configuration(), 3, true, null);
+      fs = miniDfsCluster.getFileSystem();
+      final String namenode = fs.getUri().toString();
+      miniMrCluster = new MiniMRCluster(3, namenode, 1);
+
+      listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      target = new Path("target/tmp/target").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      root = new Path("target/tmp").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory()).toString();
+      TestDistCpUtils.delete(fs, root);
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+    }
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    miniMrCluster.shutdown();
+    fs.close();
+    miniDfsCluster.shutdown();
+  }
+
+  @Test(timeout=100000)
+  public void testSingleFileMissingTarget() {
+    caseSingleFileMissingTarget(false);
+    caseSingleFileMissingTarget(true);
+  }
+
+  private void caseSingleFileMissingTarget(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile1/file1");
+      createFiles("singlefile1/file1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testSingleFileTargetFile() {
+    caseSingleFileTargetFile(false);
+    caseSingleFileTargetFile(true);
+  }
+
+  private void caseSingleFileTargetFile(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile1/file1");
+      createFiles("singlefile1/file1", "target");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testSingleFileTargetDir() {
+    caseSingleFileTargetDir(false);
+    caseSingleFileTargetDir(true);
+  }
+
+  private void caseSingleFileTargetDir(boolean sync) {
+
+    try {
+      addEntries(listFile, "singlefile2/file2");
+      createFiles("singlefile2/file2");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1, "file2");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testSingleDirTargetMissing() {
+    caseSingleDirTargetMissing(false);
+    caseSingleDirTargetMissing(true);
+  }
+
+  private void caseSingleDirTargetMissing(boolean sync) {
+
+    try {
+      addEntries(listFile, "singledir");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 1, "dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testSingleDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "singledir");
+      mkdirs(root + "/singledir/dir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 1, "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testUpdateSingleDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "Usingledir");
+      mkdirs(root + "/Usingledir/Udir1");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 1, "Udir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testMultiFileTargetPresent() {
+    caseMultiFileTargetPresent(false);
+    caseMultiFileTargetPresent(true);
+  }
+
+  private void caseMultiFileTargetPresent(boolean sync) {
+
+    try {
+      addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(target.toString());
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 3, "file3", "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testCustomCopyListing() {
+
+    try {
+      addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5");
+      createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
+      mkdirs(target.toString());
+
+      Configuration conf = getConf();
+      try {
+        conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
+            CustomCopyListing.class, CopyListing.class);
+        DistCpOptions options = new DistCpOptions(Arrays.
+            asList(new Path(root + "/" + "multifile1")), target);
+        options.setSyncFolder(true);
+        options.setDeleteMissing(false);
+        options.setOverwrite(false);
+        try {
+          new DistCp(conf, options).execute();
+        } catch (Exception e) {
+          LOG.error("Exception encountered ", e);
+          throw new IOException(e);
+        }
+      } finally {
+        conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
+      }
+
+      checkResult(target, 2, "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  private static class CustomCopyListing extends SimpleCopyListing {
+
+    public CustomCopyListing(Configuration configuration,
+                             Credentials credentials) {
+      super(configuration, credentials);
+    }
+
+    @Override
+    protected boolean shouldCopy(Path path, DistCpOptions options) {
+      return !path.getName().equals("file3");
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testMultiFileTargetMissing() {
+    caseMultiFileTargetMissing(false);
+    caseMultiFileTargetMissing(true);
+  }
+
+  private void caseMultiFileTargetMissing(boolean sync) {
+
+    try {
+      addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+
+      runTest(listFile, target, sync);
+
+      checkResult(target, 3, "file3", "file4", "file5");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testMultiDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(target.toString(), root + "/singledir/dir1");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testUpdateMultiDirTargetPresent() {
+
+    try {
+      addEntries(listFile, "Umultifile", "Usingledir");
+      createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5");
+      mkdirs(target.toString(), root + "/Usingledir/Udir1");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testMultiDirTargetMissing() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4",
+          "multifile/file5", "singledir/dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testUpdateMultiDirTargetMissing() {
+
+    try {
+      addEntries(listFile, "multifile", "singledir");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      mkdirs(root + "/singledir/dir1");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "file3", "file4", "file5", "dir1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+    }
+  }
+  
+  @Test(timeout=100000)
+  public void testDeleteMissingInDestination() {
+    
+    try {
+      addEntries(listFile, "srcdir");
+      createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2");
+      
+      Path target = new Path(root + "/dstdir");
+      runTest(listFile, target, true, true, false);
+      
+      checkResult(target, 1, "file1");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+  
+  @Test(timeout=100000)
+  public void testOverwrite() {
+    byte[] contents1 = "contents1".getBytes();
+    byte[] contents2 = "contents2".getBytes();
+    Assert.assertEquals(contents1.length, contents2.length);
+    
+    try {
+      addEntries(listFile, "srcdir");
+      createWithContents("srcdir/file1", contents1);
+      createWithContents("dstdir/file1", contents2);
+      
+      Path target = new Path(root + "/dstdir");
+      runTest(listFile, target, false, false, true);
+      
+      checkResult(target, 1, "file1");
+      
+      // make sure dstdir/file1 has been overwritten with the contents
+      // of srcdir/file1
+      FSDataInputStream is = fs.open(new Path(root + "/dstdir/file1"));
+      byte[] dstContents = new byte[contents1.length];
+      is.readFully(dstContents);
+      is.close();
+      Assert.assertArrayEquals(contents1, dstContents);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testGlobTargetMissingSingleLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+                                fs.getWorkingDirectory());
+      addEntries(listFile, "*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir/dir2/file6");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5",
+          "singledir/dir2/file6");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testUpdateGlobTargetMissingSingleLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+                                  fs.getWorkingDirectory());
+      addEntries(listFile, "*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir/dir2/file6");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 4, "file3", "file4", "file5", "dir2/file6");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testGlobTargetMissingMultiLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      addEntries(listFile, "*/*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+          "singledir1/dir3/file9");
+
+      runTest(listFile, target, false);
+
+      checkResult(target, 4, "file3", "file4", "file5",
+          "dir3/file7", "dir3/file8", "dir3/file9");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testUpdateGlobTargetMissingMultiLevel() {
+
+    try {
+      Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(),
+              fs.getWorkingDirectory());
+      addEntries(listFile, "*/*");
+      createFiles("multifile/file3", "multifile/file4", "multifile/file5");
+      createFiles("singledir1/dir3/file7", "singledir1/dir3/file8",
+          "singledir1/dir3/file9");
+
+      runTest(listFile, target, true);
+
+      checkResult(target, 6, "file3", "file4", "file5",
+          "file7", "file8", "file9");
+    } catch (IOException e) {
+      LOG.error("Exception encountered while running distcp", e);
+      Assert.fail("distcp failure");
+    } finally {
+      TestDistCpUtils.delete(fs, root);
+      TestDistCpUtils.delete(fs, "target/tmp1");
+    }
+  }
+  
+  @Test(timeout=100000)
+  public void testCleanup() {
+    try {
+      Path sourcePath = new Path("noscheme:///file");
+      List<Path> sources = new ArrayList<Path>();
+      sources.add(sourcePath);
+
+      DistCpOptions options = new DistCpOptions(sources, target);
+
+      JobConf conf = getConf();
+      Path stagingDir = JobSubmissionFiles.getStagingDir(
+              new JobClient(conf), conf);
+      stagingDir.getFileSystem(conf).mkdirs(stagingDir);
+
+      try {
+        new DistCp(conf, options).execute();
+      } catch (Throwable t) {
+        Assert.assertEquals(stagingDir.getFileSystem(conf).
+            listStatus(stagingDir).length, 0);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("testCleanup failed " + e.getMessage());
+    }
+  }
+  
+  private void addEntries(Path listFile, String... entries) throws IOException {
+    OutputStream out = fs.create(listFile);
+    try {
+      for (String entry : entries){
+        out.write((root + "/" + entry).getBytes());
+        out.write("\n".getBytes());
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+  private void createFiles(String... entries) throws IOException {
+    for (String entry : entries){
+      if (!entry.startsWith("hdfs://")) {
+        entry = root + "/" + entry;
+      }
+      OutputStream out = fs.create(new Path(entry));
+      try {
+        out.write((root + "/" + entry).getBytes());
+        out.write("\n".getBytes());
+      } finally {
+        out.close();
+      }
+    }
+  }
+  
+  private void createWithContents(String entry, byte[] contents) throws IOException {
+    OutputStream out = fs.create(new Path(root + "/" + entry));
+    try {
+      out.write(contents);
+    } finally {
+      out.close();
+    }
+  }
+
+  private void mkdirs(String... entries) throws IOException {
+    for (String entry : entries){
+      fs.mkdirs(new Path(entry));
+    }
+  }
+    
+  private void runTest(Path listFile, Path target, boolean sync) throws IOException {
+    runTest(listFile, target, sync, false, false);
+  }
+  
+  private void runTest(Path listFile, Path target, boolean sync, boolean delete,
+      boolean overwrite) throws IOException {
+    DistCpOptions options = new DistCpOptions(listFile, target);
+    options.setSyncFolder(sync);
+    options.setDeleteMissing(delete);
+    options.setOverwrite(overwrite);
+    try {
+      new DistCp(getConf(), options).execute();
+    } catch (Exception e) {
+      LOG.error("Exception encountered ", e);
+      throw new IOException(e);
+    }
+  }
+
+  private void checkResult(Path target, int count, String... relPaths) throws IOException {
+    Assert.assertEquals(count, fs.listStatus(target).length);
+    if (relPaths == null || relPaths.length == 0) {
+      Assert.assertTrue(target.toString(), fs.exists(target));
+      return;
+    }
+    for (String relPath : relPaths) {
+      Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath)));
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestOptionsParser {
+
+  @Test
+  public void testParseIgnoreFailure() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldIgnoreFailures());
+
+    options = OptionsParser.parse(new String[] {
+        "-i",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldIgnoreFailures());
+  }
+
+  @Test
+  public void testParseOverwrite() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldOverwrite());
+
+    options = OptionsParser.parse(new String[] {
+        "-overwrite",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldOverwrite());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-update",
+          "-overwrite",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Update and overwrite aren't allowed together");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testLogPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getLogPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-log",
+        "hdfs://localhost:8020/logs",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getLogPath(), new Path("hdfs://localhost:8020/logs"));
+  }
+
+  @Test
+  public void testParseBlokcing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldBlock());
+
+    options = OptionsParser.parse(new String[] {
+        "-async",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldBlock());
+  }
+
+  @Test
+  public void testParsebandwidth() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+    options = OptionsParser.parse(new String[] {
+        "-bandwidth",
+        "11",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMapBandwidth(), 11);
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testParseNonPositiveBandwidth() {
+    OptionsParser.parse(new String[] {
+        "-bandwidth",
+        "-11",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+  }
+  
+  @Test(expected=IllegalArgumentException.class)
+  public void testParseZeroBandwidth() {
+    OptionsParser.parse(new String[] {
+        "-bandwidth",
+        "0",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+  }
+
+  @Test
+  public void testParseSkipCRC() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldSkipCRC());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-skipcrccheck",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+    Assert.assertTrue(options.shouldSkipCRC());
+  }
+
+  @Test
+  public void testParseAtomicCommit() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldAtomicCommit());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldAtomicCommit());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-atomic",
+          "-update",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Atomic and sync folders were allowed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testParseWorkPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getAtomicWorkPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getAtomicWorkPath());
+
+    options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "-tmp",
+        "hdfs://localhost:8020/work",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getAtomicWorkPath(), new Path("hdfs://localhost:8020/work"));
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-tmp",
+          "hdfs://localhost:8020/work",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("work path was allowed without -atomic switch");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testParseSyncFolders() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldSyncFolder());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+  }
+
+  @Test
+  public void testParseDeleteMissing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldDeleteMissing());
+
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-delete",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldSyncFolder());
+    Assert.assertTrue(options.shouldDeleteMissing());
+
+    options = OptionsParser.parse(new String[] {
+        "-overwrite",
+        "-delete",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldOverwrite());
+    Assert.assertTrue(options.shouldDeleteMissing());
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-atomic",
+          "-delete",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Atomic and delete folders were allowed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testParseSSLConf() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertNull(options.getSslConfigurationFile());
+
+    options = OptionsParser.parse(new String[] {
+        "-mapredSslConf",
+        "/tmp/ssl-client.xml",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml");
+  }
+
+  @Test
+  public void testParseMaps() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMaxMaps(), DistCpConstants.DEFAULT_MAPS);
+
+    options = OptionsParser.parse(new String[] {
+        "-m",
+        "1",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMaxMaps(), 1);
+
+    options = OptionsParser.parse(new String[] {
+        "-m",
+        "0",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getMaxMaps(), 1);
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-m",
+          "hello",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Non numberic map parsed");
+    } catch (IllegalArgumentException ignore) { }
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-mapredXslConf",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Non numberic map parsed");
+    } catch (IllegalArgumentException ignore) { }
+  }
+
+  @Test
+  public void testSourceListing() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getSourceFileListing(),
+        new Path("hdfs://localhost:8020/source/first"));
+  }
+
+  @Test
+  public void testSourceListingAndSourcePath() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-f",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Both source listing & source paths allowed");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testMissingSourceInfo() {
+    try {
+      OptionsParser.parse(new String[] {
+          "hdfs://localhost:8020/target/"});
+      Assert.fail("Neither source listing not source paths present");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testMissingTarget() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-f", "hdfs://localhost:8020/source"});
+      Assert.fail("Missing target allowed");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testInvalidArgs() {
+    try {
+      OptionsParser.parse(new String[] {
+          "-m", "-f", "hdfs://localhost:8020/source"});
+      Assert.fail("Missing map value");
+    } catch (IllegalArgumentException ignore) {}
+  }
+
+  @Test
+  public void testToString() {
+    DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz"));
+    String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
+        "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
+        "sourceFileListing=abc, sourcePaths=null, targetPath=xyz}";
+    Assert.assertEquals(val, option.toString());
+    Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
+        DistCpOptionSwitch.ATOMIC_COMMIT.name());
+  }
+
+  @Test
+  public void testCopyStrategy() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-strategy",
+        "dynamic",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getCopyStrategy(), "dynamic");
+
+    options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getCopyStrategy(), DistCpConstants.UNIFORMSIZE);
+  }
+
+  @Test
+  public void testTargetPath() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost:8020/target/"));
+  }
+
+  @Test
+  public void testPreserve() {
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-pbr",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-pbrgup",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+
+    options = OptionsParser.parse(new String[] {
+        "-p",
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    int i = 0;
+    Iterator<FileAttribute> attribIterator = options.preserveAttributes();
+    while (attribIterator.hasNext()) {
+      attribIterator.next();
+      i++;
+    }
+    Assert.assertEquals(i, 5);
+
+    try {
+      OptionsParser.parse(new String[] {
+          "-pabc",
+          "-f",
+          "hdfs://localhost:8020/source/first",
+          "hdfs://localhost:8020/target"});
+      Assert.fail("Invalid preserve attribute");
+    }
+    catch (IllegalArgumentException ignore) {}
+    catch (NoSuchElementException ignore) {}
+
+    options = OptionsParser.parse(new String[] {
+        "-f",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+    options.preserve(FileAttribute.PERMISSION);
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+
+    options.preserve(FileAttribute.PERMISSION);
+    Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+  }
+
+  @Test
+  public void testOptionsSwitchAddToConf() {
+    Configuration conf = new Configuration();
+    Assert.assertNull(conf.get(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel()));
+    DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+  }
+
+  @Test
+  public void testOptionsAppendToConf() {
+    Configuration conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+    DistCpOptions options = OptionsParser.parse(new String[] {
+        "-atomic",
+        "-i",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
+    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+        DistCpConstants.DEFAULT_BANDWIDTH_MB);
+
+    conf = new Configuration();
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+    Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+    Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), null);
+    options = OptionsParser.parse(new String[] {
+        "-update",
+        "-delete",
+        "-pu",
+        "-bandwidth",
+        "11",
+        "hdfs://localhost:8020/source/first",
+        "hdfs://localhost:8020/target/"});
+    options.appendToConf(conf);
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
+    Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
+    Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
+    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+  }
+}

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java?rev=1461952&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java Thu Mar 28 05:00:09 2013
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.distcp2.mapred;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.distcp2.CopyListing;
+import org.apache.hadoop.tools.distcp2.DistCpConstants;
+import org.apache.hadoop.tools.distcp2.DistCpOptions;
+import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.distcp2.GlobbedCopyListing;
+import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCopyCommitter {
+  private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
+
+  private static final Random rand = new Random();
+
+  private static final Credentials CREDENTIALS = new Credentials();
+  public static final int PORT = 39737;
+
+
+  private static Configuration config;
+  private static MiniDFSCluster cluster;
+
+  private static Job getJobForClient() throws IOException {
+    Job job = Job.getInstance(new Configuration());
+    job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
+    job.setInputFormatClass(NullInputFormat.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  @BeforeClass
+  public static void create() throws IOException {
+    config = getJobForClient().getConfiguration();
+    config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
+    cluster = new MiniDFSCluster(config, 1, true, null);
+  }
+
+  @AfterClass
+  public static void destroy() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void createMetaFolder() {
+    config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
+    Path meta = new Path("/meta");
+    try {
+      cluster.getFileSystem().mkdirs(meta);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while creating meta folder", e);
+      Assert.fail("Unable to create meta folder");
+    }
+  }
+
+  @After
+  public void cleanupMetaFolder() {
+    Path meta = new Path("/meta");
+    try {
+      if (cluster.getFileSystem().exists(meta)) {
+        cluster.getFileSystem().delete(meta, true);
+        Assert.fail("Expected meta folder to be deleted");
+      }
+    } catch (IOException e) {
+      LOG.error("Exception encountered while cleaning up folder", e);
+      Assert.fail("Unable to clean up meta folder");
+    }
+  }
+
+  @Test
+  public void testNoCommitAction() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      committer.commitJob(jobContext);
+      Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+
+      //Test for idempotent commit
+      committer.commitJob(jobContext);
+      Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+      Assert.fail("Commit failed");
+    }
+  }
+
+  @Test
+  public void testPreserveStatus() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      FsPermission sourcePerm = new FsPermission((short) 511);
+      FsPermission initialPerm = new FsPermission((short) 448);
+      sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
+      targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
+
+      DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+          new Path("/out"));
+      options.preserve(FileAttribute.PERMISSION);
+      options.appendToConf(conf);
+
+      CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+      Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+      listing.buildListing(listingFile, options);
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+
+      committer.commitJob(jobContext);
+      if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+        Assert.fail("Permission don't match");
+      }
+
+      //Test for idempotent commit
+      committer.commitJob(jobContext);
+      if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
+        Assert.fail("Permission don't match");
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing for preserve status", e);
+      Assert.fail("Preserve status failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+    }
+
+  }
+
+  @Test
+  public void testDeleteMissing() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+      targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+      String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
+      fs.rename(new Path(targetBaseAdd), new Path(targetBase));
+
+      DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
+          new Path("/out"));
+      options.setSyncFolder(true);
+      options.setDeleteMissing(true);
+      options.appendToConf(conf);
+
+      CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+      Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+      listing.buildListing(listingFile, options);
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+      committer.commitJob(jobContext);
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+
+      //Test for idempotent commit
+      committer.commitJob(jobContext);
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+    } catch (Throwable e) {
+      LOG.error("Exception encountered while testing for delete missing", e);
+      Assert.fail("Delete missing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+    }
+  }
+
+  @Test
+  public void testDeleteMissingFlatInterleavedFiles() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+
+    String sourceBase;
+    String targetBase;
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
+      targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
+      TestDistCpUtils.createFile(fs, sourceBase + "/1");
+      TestDistCpUtils.createFile(fs, sourceBase + "/3");
+      TestDistCpUtils.createFile(fs, sourceBase + "/4");
+      TestDistCpUtils.createFile(fs, sourceBase + "/5");
+      TestDistCpUtils.createFile(fs, sourceBase + "/7");
+      TestDistCpUtils.createFile(fs, sourceBase + "/8");
+      TestDistCpUtils.createFile(fs, sourceBase + "/9");
+
+      TestDistCpUtils.createFile(fs, targetBase + "/2");
+      TestDistCpUtils.createFile(fs, targetBase + "/4");
+      TestDistCpUtils.createFile(fs, targetBase + "/5");
+      TestDistCpUtils.createFile(fs, targetBase + "/7");
+      TestDistCpUtils.createFile(fs, targetBase + "/9");
+      TestDistCpUtils.createFile(fs, targetBase + "/A");
+
+      DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), 
+          new Path("/out"));
+      options.setSyncFolder(true);
+      options.setDeleteMissing(true);
+      options.appendToConf(conf);
+
+      CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
+      Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
+      listing.buildListing(listingFile, options);
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
+
+      committer.commitJob(jobContext);
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+      Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+
+      //Test for idempotent commit
+      committer.commitJob(jobContext);
+      if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
+        Assert.fail("Source and target folders are not in sync");
+      }
+      Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing for delete missing", e);
+      Assert.fail("Delete missing failure");
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp1");
+      conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
+    }
+
+  }
+
+  @Test
+  public void testAtomicCommitMissingFinal() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+    String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+    String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      fs.mkdirs(new Path(workPath));
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+      conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+      Assert.assertTrue(fs.exists(new Path(workPath)));
+      Assert.assertFalse(fs.exists(new Path(finalPath)));
+      committer.commitJob(jobContext);
+      Assert.assertFalse(fs.exists(new Path(workPath)));
+      Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+      //Test for idempotent commit
+      committer.commitJob(jobContext);
+      Assert.assertFalse(fs.exists(new Path(workPath)));
+      Assert.assertTrue(fs.exists(new Path(finalPath)));
+
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing for preserve status", e);
+      Assert.fail("Atomic commit failure");
+    } finally {
+      TestDistCpUtils.delete(fs, workPath);
+      TestDistCpUtils.delete(fs, finalPath);
+    }
+  }
+
+  @Test
+  public void testAtomicCommitExistingFinal() {
+    TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
+    JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(),
+        taskAttemptContext.getTaskAttemptID().getJobID());
+    Configuration conf = jobContext.getConfiguration();
+
+
+    String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
+    String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
+    FileSystem fs = null;
+    try {
+      OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+      fs = FileSystem.get(conf);
+      fs.mkdirs(new Path(workPath));
+      fs.mkdirs(new Path(finalPath));
+
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
+      conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
+      conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
+
+      Assert.assertTrue(fs.exists(new Path(workPath)));
+      Assert.assertTrue(fs.exists(new Path(finalPath)));
+      try {
+        committer.commitJob(jobContext);
+        Assert.fail("Should not be able to atomic-commit to pre-existing path.");
+      } catch(Exception exception) {
+        Assert.assertTrue(fs.exists(new Path(workPath)));
+        Assert.assertTrue(fs.exists(new Path(finalPath)));
+        LOG.info("Atomic-commit Test pass.");
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception encountered while testing for atomic commit.", e);
+      Assert.fail("Atomic commit failure");
+    } finally {
+      TestDistCpUtils.delete(fs, workPath);
+      TestDistCpUtils.delete(fs, finalPath);
+    }
+  }
+
+  private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
+    return new TaskAttemptContext(conf,
+        new TaskAttemptID("200707121733", 1, true, 1, 1));
+  }
+
+  private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
+                                            FsPermission sourcePerm) throws IOException {
+    Path base = new Path(targetBase);
+
+    Stack<Path> stack = new Stack<Path>();
+    stack.push(base);
+    while (!stack.isEmpty()) {
+      Path file = stack.pop();
+      if (!fs.exists(file)) continue;
+      FileStatus[] fStatus = fs.listStatus(file);
+      if (fStatus == null || fStatus.length == 0) continue;
+
+      for (FileStatus status : fStatus) {
+        if (status.isDir()) {
+          stack.push(status.getPath());
+          Assert.assertEquals(status.getPermission(), sourcePerm);
+        }
+      }
+    }
+    return true;
+  }
+
+  private static class NullInputFormat extends InputFormat {
+    @Override
+    public List getSplits(JobContext context)
+        throws IOException, InterruptedException {
+      return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public RecordReader createRecordReader(InputSplit split,
+                                           TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return null;
+    }
+  }
+}



Mime
View raw message