hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077427 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/filecache/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 04:13:49 GMT
Author: omalley
Date: Fri Mar  4 04:13:49 2011
New Revision: 1077427

URL: http://svn.apache.org/viewvc?rev=1077427&view=rev
Log:
commit 6192a9f2a80ae3143f9eac255d74fbf472ff3d95
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Sun Apr 25 12:06:08 2010 -0700

    MAPREDUCE-1641. Fix DistributedCache to ensure same files cannot be put in both the archives
and files sections. Contributed by Richard King.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1641. Fix DistributedCache to ensure same files cannot be put in
    +    both the archives and files sections. (Richard King via acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURL.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURLMinicluster.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InvalidJobConfException.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java?rev=1077427&r1=1077426&r2=1077427&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/DistributedCache.java
Fri Mar  4 04:13:49 2011
@@ -138,6 +138,41 @@ public class DistributedCache {
    */
   public static final String CACHE_ARCHIVES_SIZES = 
     "mapred.cache.archives.filesizes";
+
+  /**
+   * Warning: {@link #CACHE_ARCHIVES_TIMESTAMPS} is not a *public* constant.
+   **/
+  public static final String CACHE_ARCHIVES_TIMESTAMPS = "mapred.cache.archives.timestamps";
+
+  /**
+   * Warning: {@link #CACHE_FILES_TIMESTAMPS} is not a *public* constant.
+   **/
+  public static final String CACHE_FILES_TIMESTAMPS = "mapred.cache.files.timestamps";
+
+  /**
+   * Warning: {@link #CACHE_ARCHIVES} is not a *public* constant.
+   **/
+  public static final String CACHE_ARCHIVES = "mapred.cache.archives";
+
+  /**
+   * Warning: {@link #CACHE_FILES} is not a *public* constant.
+   **/
+  public static final String CACHE_FILES = "mapred.cache.files";
+
+  /**
+   * Warning: {@link #CACHE_LOCALARCHIVES} is not a *public* constant.
+   **/
+  public static final String CACHE_LOCALARCHIVES = "mapred.cache.localArchives";
+
+  /**
+   * Warning: {@link #CACHE_LOCALFILES} is not a *public* constant.
+   **/
+  public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
+
+  /**
+   * Warning: {@link #CACHE_SYMLINK} is not a *public* constant.
+   **/
+  public static final String CACHE_SYMLINK = "mapred.create.symlink";
   
   /**
    * Get the locally cached file or archive; it could either be 
@@ -379,7 +414,7 @@ public class DistributedCache {
    */
   public static void setCacheArchives(URI[] archives, Configuration conf) {
     String sarchives = StringUtils.uriToString(archives);
-    conf.set("mapred.cache.archives", sarchives);
+    conf.set(CACHE_ARCHIVES, sarchives);
   }
 
   /**
@@ -390,7 +425,7 @@ public class DistributedCache {
    */
   public static void setCacheFiles(URI[] files, Configuration conf) {
     String sfiles = StringUtils.uriToString(files);
-    conf.set("mapred.cache.files", sfiles);
+    conf.set(CACHE_FILES, sfiles);
   }
 
   /**
@@ -401,7 +436,7 @@ public class DistributedCache {
    * @throws IOException
    */
   public static URI[] getCacheArchives(Configuration conf) throws IOException {
-    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+    return StringUtils.stringToURI(conf.getStrings(CACHE_ARCHIVES));
   }
 
   /**
@@ -412,7 +447,7 @@ public class DistributedCache {
    * @throws IOException
    */
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
-    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+    return StringUtils.stringToURI(conf.getStrings(CACHE_FILES));
   }
 
   /**
@@ -425,7 +460,7 @@ public class DistributedCache {
   public static Path[] getLocalCacheArchives(Configuration conf)
     throws IOException {
     return StringUtils.stringToPath(conf
-                                    .getStrings("mapred.cache.localArchives"));
+                                    .getStrings(CACHE_LOCALARCHIVES));
   }
 
   /**
@@ -437,7 +472,7 @@ public class DistributedCache {
    */
   public static Path[] getLocalCacheFiles(Configuration conf)
     throws IOException {
-    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+    return StringUtils.stringToPath(conf.getStrings(CACHE_LOCALFILES));
   }
 
   /**
@@ -448,7 +483,7 @@ public class DistributedCache {
    * @throws IOException
    */
   public static String[] getArchiveTimestamps(Configuration conf) {
-    return conf.getStrings("mapred.cache.archives.timestamps");
+    return conf.getStrings(CACHE_ARCHIVES_TIMESTAMPS);
   }
 
 
@@ -460,7 +495,7 @@ public class DistributedCache {
    * @throws IOException
    */
   public static String[] getFileTimestamps(Configuration conf) {
-    return conf.getStrings("mapred.cache.files.timestamps");
+    return conf.getStrings(CACHE_FILES_TIMESTAMPS);
   }
 
   /**
@@ -471,7 +506,7 @@ public class DistributedCache {
    * The order should be the same as the order in which the archives are added.
    */
   public static void setArchiveTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.archives.timestamps", timestamps);
+    conf.set(CACHE_ARCHIVES_TIMESTAMPS, timestamps);
   }
 
   /**
@@ -482,7 +517,7 @@ public class DistributedCache {
    * The order should be the same as the order in which the files are added.
    */
   public static void setFileTimestamps(Configuration conf, String timestamps) {
-    conf.set("mapred.cache.files.timestamps", timestamps);
+    conf.set(CACHE_FILES_TIMESTAMPS, timestamps);
   }
   
   /**
@@ -492,7 +527,7 @@ public class DistributedCache {
    * @param str a comma separated list of local archives
    */
   public static void setLocalArchives(Configuration conf, String str) {
-    conf.set("mapred.cache.localArchives", str);
+    conf.set(CACHE_LOCALARCHIVES, str);
   }
 
   /**
@@ -502,7 +537,7 @@ public class DistributedCache {
    * @param str a comma separated list of local files
    */
   public static void setLocalFiles(Configuration conf, String str) {
-    conf.set("mapred.cache.localFiles", str);
+    conf.set(CACHE_LOCALFILES, str);
   }
 
   /**
@@ -512,8 +547,8 @@ public class DistributedCache {
    * @param conf Configuration to add the cache to
    */
   public static void addCacheArchive(URI uri, Configuration conf) {
-    String archives = conf.get("mapred.cache.archives");
-    conf.set("mapred.cache.archives", archives == null ? uri.toString()
+    String archives = conf.get(CACHE_ARCHIVES);
+    conf.set(CACHE_ARCHIVES, archives == null ? uri.toString()
              : archives + "," + uri.toString());
   }
   
@@ -524,20 +559,20 @@ public class DistributedCache {
    * @param conf Configuration to add the cache to
    */
   public static void addCacheFile(URI uri, Configuration conf) {
-    String files = conf.get("mapred.cache.files");
-    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+    String files = conf.get(CACHE_FILES);
+    conf.set(CACHE_FILES, files == null ? uri.toString() : files + ","
              + uri.toString());
   }
 
   /**
-   * Add an file path to the current set of classpath entries It adds the file
+   * Add a file path to the current set of classpath entries. It adds the file
    * to cache as well.  Intended to be used by user code.
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
    */
   public static void addFileToClassPath(Path file, Configuration conf)
-    throws IOException {
+        throws IOException {
     String classpath = conf.get("mapred.job.classpath.files");
     conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
              : classpath + System.getProperty("path.separator") + file.toString());
@@ -611,7 +646,7 @@ public class DistributedCache {
    * @param conf the jobconf 
    */
   public static void createSymlink(Configuration conf){
-    conf.set("mapred.create.symlink", "yes");
+    conf.set(CACHE_SYMLINK, "yes");
   }
   
   /**
@@ -622,7 +657,7 @@ public class DistributedCache {
    * @return true if symlinks are to be created- else return false
    */
   public static boolean getSymlink(Configuration conf){
-    String result = conf.get("mapred.create.symlink");
+    String result = conf.get(CACHE_SYMLINK);
     if ("yes".equals(result)){
       return true;
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077427&r1=1077426&r2=1077427&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
Fri Mar  4 04:13:49 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.filecache;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.RunJar;
@@ -771,4 +773,76 @@ public class TrackerDistributedCacheMana
 
     TokenCache.obtainTokensForNamenodes(credentials, ps, job);
   }
+
+  /** 
+   * This is part of the framework API.  It's called within the job
+   * submission code only, not by users.  In the non-error case it has
+   * no side effects and returns normally.  If there's a URI in both
+   * mapred.cache.files and mapred.cache.archives, it throws its
+   * exception. 
+   * @param conf a {@link Configuration} to be cheked for duplication
+   * in cached URIs 
+   * @throws InvalidJobConfException
+   **/
+  public static void validate(Configuration conf)
+                          throws InvalidJobConfException {
+    final String[] archiveStrings
+      = conf.getStrings(DistributedCache.CACHE_ARCHIVES);
+    final String[] fileStrings = conf.getStrings(DistributedCache.CACHE_FILES);
+
+    Path thisSubject = null;
+
+    String thisCategory = DistributedCache.CACHE_ARCHIVES;
+
+    if (archiveStrings != null && fileStrings != null) {
+      final Set<Path> archivesSet = new HashSet<Path>();
+
+      for (String archiveString : archiveStrings) {
+        archivesSet.add(coreLocation(archiveString, conf));
+      }
+
+      thisCategory = DistributedCache.CACHE_FILES;
+
+      for (String fileString : fileStrings) {
+        thisSubject = coreLocation(fileString, conf);
+
+        if (archivesSet.contains(thisSubject)) {
+          throw new InvalidJobConfException
+            ("The core URI, \""
+             + thisSubject
+             + "\" is listed both in " + DistributedCache.CACHE_FILES
+             + " and in " + DistributedCache.CACHE_ARCHIVES + " .");
+        }
+      }
+    }
+  }
+
+  private static Path coreLocation(String uriString, Configuration conf) 
+       throws InvalidJobConfException {
+    // lose the fragment, if it's likely to be a symlink name
+    if (DistributedCache.getSymlink(conf)) {
+      try {
+        URI uri = new URI(uriString);
+        uriString
+          = (new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(),
+                     null, null)
+             .toString());
+      } catch (URISyntaxException e) {
+        throw new InvalidJobConfException
+          ("Badly formatted URI: " + uriString, e);
+      }
+    }
+        
+    Path path = new Path(uriString);
+
+    try {
+      path = path.makeQualified(path.getFileSystem(conf));
+    } catch (IOException e) {
+      throw new InvalidJobConfException
+        ("Invalid file system in distributed cache for the URI: "
+         + uriString, e);
+    }
+
+    return path;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InvalidJobConfException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InvalidJobConfException.java?rev=1077427&r1=1077426&r2=1077427&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InvalidJobConfException.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InvalidJobConfException.java
Fri Mar  4 04:13:49 2011
@@ -37,4 +37,12 @@ public class InvalidJobConfException
     super(msg);
   }
 
+  public InvalidJobConfException(String msg, Throwable t) {
+    super(msg, t);
+  }
+
+  public InvalidJobConfException(Throwable t) {
+    super(t);
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1077427&r1=1077426&r2=1077427&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobClient.java
Fri Mar  4 04:13:49 2011
@@ -750,7 +750,6 @@ public class JobClient extends Configure
       ClassNotFoundException,
       InterruptedException,
       IOException{
-
         JobConf jobCopy = job;
         Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
             jobCopy);
@@ -767,6 +766,9 @@ public class JobClient extends Configure
                (new Path("file:///" +  binaryTokenFilename), jobCopy);
           }
 
+          // First we check whether the cached archives and files are legal.
+          TrackerDistributedCacheManager.validate(jobCopy);
+
           copyAndConfigureFiles(jobCopy, submitJobDir);
 
           // get delegation token for the dir
@@ -807,7 +809,6 @@ public class JobClient extends Configure
             out.close();
           }
 
-
           //
           // Now, actually submit the job (using the submit name)
           //

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURL.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURL.java?rev=1077427&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURL.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURL.java
Fri Mar  4 04:13:49 2011
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+import org.apache.hadoop.filecache.DistributedCache;
+
+import org.apache.hadoop.util.Tool;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestDuplicateArchiveFileCachedURL extends TestCase {
+  private static String DUPLICATED_URL_CORE = "file://foo/myapp/map.zip";
+  private static String UNDUPLICATED_URL_CORE = "file://foo/myapp/something-else.zip";
+
+  private enum Symbolicness { SYMLINK, NOLINK };
+  private enum Erroneasness { ERROR, NO_ERROR };
+
+  int outputDirectoryIndex = 0;
+
+  @Test
+  public void testArchivesFilesJobSubmisisions() throws Exception {
+    final Symbolicness SYMLINK = Symbolicness.SYMLINK;
+    final Symbolicness NOLINK = Symbolicness.NOLINK;
+
+    final Erroneasness ERROR = Erroneasness.ERROR;
+    final Erroneasness NO_ERROR = Erroneasness.NO_ERROR;
+
+    URI fileURI = new URI(DUPLICATED_URL_CORE);
+    URI symlinkURI = new URI(DUPLICATED_URL_CORE + "#symlink");
+    URI nonConflictingURI = new URI(UNDUPLICATED_URL_CORE);
+
+    testSubmission(null, null, NOLINK, NO_ERROR);
+    testSubmission(null, null, SYMLINK, NO_ERROR);
+
+    testSubmission(fileURI, nonConflictingURI, NOLINK, NO_ERROR);
+    testSubmission(fileURI, nonConflictingURI, SYMLINK, NO_ERROR);
+
+    testSubmission(null, nonConflictingURI, NOLINK, NO_ERROR);
+    testSubmission(null, nonConflictingURI, SYMLINK, NO_ERROR);
+
+    testSubmission(fileURI, fileURI, NOLINK, ERROR);
+    testSubmission(fileURI, symlinkURI, NOLINK, NO_ERROR);
+    testSubmission(fileURI, symlinkURI, SYMLINK, ERROR);
+  }
+
+  private static class NullMapper
+      implements Mapper<NullWritable,Text,NullWritable,Text> {
+    public void map(NullWritable key, Text val,
+        OutputCollector<NullWritable,Text> output, Reporter reporter)
+        throws IOException {
+      output.collect(NullWritable.get(), val);
+    }
+    public void configure(JobConf conf) { }
+    public void close() { }
+  }
+
+  private void testSubmission(URI archive, URI file,
+                              Symbolicness symbolicness,
+                              Erroneasness expectError) {
+    JobConf conf = null;
+    final String testDescription
+      = (" archives = {" + (archive == null ? "" : archive.toString())
+         + "}, file = {"
+         + (file == null ? "" : file.toString()) + "}, "
+         + symbolicness);
+
+    try {
+      conf = new JobConf(TestDuplicateArchiveFileCachedURL.class);
+      final FileSystem fs = FileSystem.getLocal(conf);
+      final Path testdir
+        = new Path(System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+      Path inFile = new Path(testdir, "nullin/blah");
+      SequenceFile.Writer w
+        = SequenceFile.createWriter(fs, conf, inFile,
+                                    NullWritable.class, Text.class,
+                                    SequenceFile.CompressionType.NONE);
+      Text t = new Text();
+      t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
+      t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
+      t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
+      t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
+      t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
+      t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
+      t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
+      t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
+      w.close();
+      FileInputFormat.setInputPaths(conf, inFile);
+      FileOutputFormat.setOutputPath
+        (conf, new Path(testdir, "nullout" + ++outputDirectoryIndex));
+      conf.setMapperClass(NullMapper.class);
+      conf.setReducerClass(IdentityReducer.class);
+      conf.setOutputKeyClass(NullWritable.class);
+      conf.setOutputValueClass(Text.class);
+      conf.setInputFormat(SequenceFileInputFormat.class);
+      conf.setOutputFormat(SequenceFileOutputFormat.class);
+      conf.setNumReduceTasks(1);
+
+      if (symbolicness == Symbolicness.SYMLINK) {
+        DistributedCache.createSymlink(conf);
+      }
+
+      if (archive != null) {
+        System.out.println("adding archive: " + archive);
+        DistributedCache.addCacheArchive(archive, conf);
+      }
+
+      if (file != null) {
+        DistributedCache.addCacheFile(file, conf);
+      }
+    } catch (IOException e) {
+      System.out.println("testSubmission -- got exception setting up a job.");
+      e.printStackTrace();
+    }
+
+    try {
+      JobClient.runJob(conf);
+
+      assertTrue("A test, " + testDescription
+                 + ", succeeded but should have failed.",
+                 expectError == Erroneasness.NO_ERROR);
+      System.out.println(testDescription
+                         + " succeeded, as we expected.");
+    } catch (InvalidJobConfException e) {
+      assertTrue("A test, " + testDescription
+                 + ", succeeded but should have failed.",
+                 expectError == Erroneasness.ERROR);
+      System.out.println(testDescription
+                         + " failed on duplicated cached files,"
+                         + " as we expected.");
+    } catch (FileNotFoundException e) {
+      assertEquals(testDescription
+                   + "We shouldn't be unpacking files if there's a clash",
+                   Erroneasness.NO_ERROR, expectError);
+      System.out.println(testDescription + " got an expected "
+                         + "FileNotFoundException because we"
+                         + " don't provide cached files");
+    } catch (IOException e) {
+      e.printStackTrace();
+      assertTrue("During a test, " + testDescription
+                    + ", runJob throws an IOException other"
+                    + "than an InvalidJobConfException.",
+                 false);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURLMinicluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURLMinicluster.java?rev=1077427&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURLMinicluster.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestDuplicateArchiveFileCachedURLMinicluster.java
Fri Mar  4 04:13:49 2011
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+import org.apache.hadoop.filecache.DistributedCache;
+
+public class TestDuplicateArchiveFileCachedURLMinicluster extends ClusterMapReduceTestCase
{
+  
+  enum EnumCounter { MAP_RECORDS }
+  
+  public void testDuplicationsMinicluster() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    JobConf conf = createJobConf();
+    conf.setJobName("counters");
+    
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    FileInputFormat.setInputPaths(conf, getInputDir());
+
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+
+    Path inputRoot = getInputDir().makeQualified(getFileSystem());
+    Path unqualifiedInputRoot = getInputDir();
+    System.out.println("The qualified input dir is " + inputRoot.toString());
+    System.out.println("The unqualified input dir is " + unqualifiedInputRoot.toString());
+
+    Path duplicatedPath = new Path(inputRoot, "text.txt");
+    URI duplicatedURI = duplicatedPath.toUri();
+
+    Path unqualifiedDuplicatedPath = new Path(unqualifiedInputRoot, "text.txt");
+    URI unqualifiedDuplicatedURI = unqualifiedDuplicatedPath.toUri();
+
+    System.out.println("The duplicated Path is " + duplicatedPath);
+    System.out.println("The duplicated URI is " + duplicatedURI);
+    System.out.println("The unqualified duplicated URI is " + unqualifiedDuplicatedURI);
+
+    DistributedCache.addCacheArchive(duplicatedURI, conf);
+    DistributedCache.addCacheFile(unqualifiedDuplicatedURI, conf);
+
+    try {
+      RunningJob runningJob = JobClient.runJob(conf);
+
+      assertFalse("The job completed, which is wrong since there's a duplication", true);
+    } catch (InvalidJobConfException e) {
+      System.out.println("We expect to see a stack trace here.");
+      e.printStackTrace(System.out);
+    }
+  }
+  
+  public void testApparentDuplicationsMinicluster() throws Exception {
+    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text2.txt"));
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("hello1\n");
+    wr.write("hello2\n");
+    wr.write("hello3\n");
+    wr.write("hello4\n");
+    wr.close();
+
+    JobConf conf = createJobConf();
+    conf.setJobName("counters");
+    
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    FileInputFormat.setInputPaths(conf, getInputDir());
+
+    FileOutputFormat.setOutputPath(conf, getOutputDir());
+
+    Path localInputRoot = getInputDir().makeQualified(lfs);
+    Path dfsInputRoot = getInputDir().makeQualified(getFileSystem());
+    Path unqualifiedInputRoot = getInputDir();
+    System.out.println("The qualified input dir is " + dfsInputRoot.toString());
+    System.out.println("The unqualified input dir is " + unqualifiedInputRoot.toString());
+
+    Path dfsUnqualPath = new Path(unqualifiedInputRoot, "text2.txt");
+    Path dfsQualPath = new Path(dfsInputRoot, "test2.text");
+    Path localQualPath = new Path(localInputRoot, "test2.text");
+
+    System.out.println("The dfs unqualified Path is " + dfsUnqualPath);
+    System.out.println("The dfs qualified Path is " + dfsQualPath);
+    System.out.println("The local qualified path is " + localQualPath);
+
+    DistributedCache.addCacheArchive(localQualPath.toUri(), conf);
+    DistributedCache.addCacheFile(dfsUnqualPath.toUri(), conf);
+    DistributedCache.addCacheFile(dfsQualPath.toUri(), conf);
+
+    try {
+      RunningJob runningJob = JobClient.runJob(conf);
+
+      assertFalse("The job completed, which is wrong since there's no local cached file",
true);
+    } catch (InvalidJobConfException e) {
+      System.out.println("We expect to see a stack trace here.");
+      e.printStackTrace(System.out);
+      assertFalse("This error should not occur.", true);
+    } catch (FileNotFoundException e) {
+      System.out.println(" got an expected FileNotFoundException because we didn't provide
cached files");
+    }
+  }
+}



Mime
View raw message