hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1075670 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskRunner.java src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
Date Tue, 01 Mar 2011 05:59:14 GMT
Author: tomwhite
Date: Tue Mar  1 05:59:14 2011
New Revision: 1075670

URL: http://svn.apache.org/viewvc?rev=1075670&view=rev
Log:
MAPREDUCE-2074. Task should fail when symlink creation fails. Contributed by Priyo Mustafi.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1075670&r1=1075669&r2=1075670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar  1 05:59:14 2011
@@ -83,6 +83,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2203. Wrong javadoc for TaskRunner's appendJobJarClasspaths
     method. (Jingguo Yao via tomwhite)
 
+    MAPREDUCE-2074. Task should fail when symlink creation fails.
+    (Priyo Mustafi via tomwhite)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1075670&r1=1075669&r2=1075670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Mar  1 05:59:14
2011
@@ -737,7 +737,8 @@ abstract class TaskRunner extends Thread
       if (!flink.exists()) {
         LOG.info(String.format("Creating symlink: %s <- %s", target, link));
         if (0 != FileUtil.symLink(target, link)) {
-          LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+          throw new IOException(String.format(
+                "Failed to create symlink: %s <- %s", target, link));
         }
       }
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1075670&r1=1075669&r2=1075670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
Tue Mar  1 05:59:14 2011
@@ -115,8 +115,8 @@ public class TestMRWithDistributedCache 
       if (!"local".equals(
           context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) {
         File symlinkFile = new File("distributed.first.symlink");
-        TestCase.assertTrue(symlinkFile.exists());
-        TestCase.assertEquals(1, symlinkFile.length());
+        TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists());
+        TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length());
       }
     }
   }
@@ -144,7 +144,11 @@ public class TestMRWithDistributedCache 
     job.addFileToClassPath(second);
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
-    job.createSymlink();
+    
+    // don't create symlink for LocalJobRunner
+    if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) {
+      job.createSymlink();
+    }
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java?rev=1075670&r1=1075669&r2=1075670&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupWorkDir.java
Tue Mar  1 05:59:14 2011
@@ -20,13 +20,17 @@ package org.apache.hadoop.mapred;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 
 /**
  * Verifies if TaskRunner.SetupWorkDir() is cleaning up files/dirs pointed
@@ -53,12 +57,25 @@ public class TestSetupWorkDir extends Te
   static void createSubDirAndFile(FileSystem fs, Path dir) throws IOException {
     Path subDir = new Path(dir, "subDir");
     fs.mkdirs(subDir);
-    Path p = new Path(dir, "file");
+    createFile(fs, dir, "file");
+  }
+
+  /**
+   * Create a file 
+   * 
+   * @param fs filesystem
+   * @param dir directory location of the file
+   * @param fileName filename
+   * @throws IOException
+   */
+  static void createFile(FileSystem fs, Path dir, String fileName) 
+      throws IOException {
+    Path p = new Path(dir, fileName);
     DataOutputStream out = fs.create(p);
     out.writeBytes("dummy input");
     out.close();    
   }
-
+  
   void createEmptyDir(FileSystem fs, Path dir) throws IOException {
     if (fs.exists(dir)) {
       fs.delete(dir, true);
@@ -114,4 +131,104 @@ public class TestSetupWorkDir extends Te
     // cleanup
     fs.delete(rootDir, true);
   }
+
+  /**
+   * Validates distributed cache symlink getting created fine
+   * 
+   * @throws IOException, URISyntaxException 
+   */
+  public void testSetupWorkDirDistCacheSymlinkValid() 
+      throws IOException, URISyntaxException {
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDirSymlinkFailure");
+
+    // create file for DistributedCache and set it
+    Path myTargetDir = new Path(rootDir, "./tmp");
+    createEmptyDir(fs, myTargetDir);
+    createFile(fs, myTargetDir, "cacheFile.txt");
+    DistributedCache.setLocalFiles(jConf, 
+        (myTargetDir.toString()+Path.SEPARATOR+"cacheFile.txt"));
+    assertTrue("Did not create cache file in " + myTargetDir,
+        (fs.listStatus(myTargetDir).length == 1));
+
+    // let us enable creation of symlinks in setupWorkDir()
+    jConf.set(MRJobConfig.CACHE_SYMLINK, "yes");
+
+    // add a valid symlink
+    Path myWorkDir = new Path(rootDir, "./work");
+    createEmptyDir(fs, myWorkDir);
+    DistributedCache.addCacheFile(new URI(myWorkDir.toString() +
+        Path.SEPARATOR + "file.txt#valid"), jConf);
+
+    // setupWorkDir should create symlinks
+    TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
+
+    // myWorkDir should have 2 entries, a tmp dir and the symlink valid
+    assertTrue(myWorkDir + " does not have cache symlink.",
+        fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 2));
+
+    // make sure work dir has symlink valid
+    boolean foundValid = false;
+    for (FileStatus fstat : fs.listStatus(myWorkDir)) {
+      if (fstat.getPath().toUri() != null &&
+            fstat.getPath().toUri().getPath().toString()
+              .equals(myWorkDir.toString() + Path.SEPARATOR+ "valid")) {
+        foundValid = true;
+      }
+    }
+    
+    assertTrue("Valid symlink not created", foundValid);
+
+    // cleanup
+    fs.delete(rootDir, true);
+  }
+
+  /**
+   * Invalid distributed cache files errors out with IOException
+   * 
+   * @throws IOException, URISyntaxException 
+   */
+  public void testSetupWorkDirDistCacheSymlinkInvalid() 
+      throws IOException, URISyntaxException {
+    JobConf jConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jConf);
+
+    Path rootDir = new Path(System.getProperty("test.build.data",  "/tmp"),
+                            "testSetupWorkDirSymlinkFailure");
+
+    // create file for DistributedCache and set it
+    Path myTargetDir = new Path(rootDir, "./tmp");
+    createEmptyDir(fs, myTargetDir);
+    createFile(fs, myTargetDir, "cacheFile.txt");
+    DistributedCache.setLocalFiles(jConf, (myTargetDir.toString() +
+        Path.SEPARATOR+"cacheFile.txt"));
+    assertTrue("Did not create cache file in " + myTargetDir,
+        (fs.listStatus(myTargetDir).length == 1));
+
+    // let us enable creation of symlinks in setupWorkDir()
+    jConf.set(MRJobConfig.CACHE_SYMLINK, "yes");
+
+    // add an invalid symlink
+    Path myWorkDir = new Path(rootDir, "./work");
+    createEmptyDir(fs, myWorkDir);
+    DistributedCache.addCacheFile(new URI(myWorkDir.toString() + 
+        Path.SEPARATOR+"file.txt#invalid/abc"), jConf);
+
+    // setupWorkDir should throw exception
+    try {
+       TaskRunner.setupWorkDir(jConf, new File(myWorkDir.toUri().getPath()));
+       assertFalse("TaskRunner.setupWorkDir() did not throw exception when" +
+           " given invalid cache file", true);
+    } catch(IOException e) {
+        // this is correct behavior
+        assertTrue(myWorkDir + " does not have cache symlink.",
+               fs.exists(myWorkDir) && (fs.listStatus(myWorkDir).length == 0));
+    }
+
+    // cleanup
+    fs.delete(rootDir, true);
+  }
 }



Mime
View raw message