hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346214 [7/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea...
Date Tue, 05 Jun 2012 02:33:47 GMT
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Jun  5 02:33:44 2012
@@ -485,7 +485,7 @@ public class TestTaskTrackerMemoryManage
       // tree rooted at 100 is over limit immediately, as it is
       // twice over the mem limit.
       ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
-                                          "100", true, 100L, 
+                                          "100", true,
                                           procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertTrue("tree rooted at 100 should be over limit " +
@@ -493,7 +493,7 @@ public class TestTaskTrackerMemoryManage
                   test.isProcessTreeOverLimit(pTree, "dummyId", limit));
       
       // the tree rooted at 200 is initially below limit.
-      pTree = new ProcfsBasedProcessTree("200", true, 100L,
+      pTree = new ProcfsBasedProcessTree("200", true,
                                           procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertFalse("tree rooted at 200 shouldn't be over limit " +
@@ -506,7 +506,7 @@ public class TestTaskTrackerMemoryManage
                   test.isProcessTreeOverLimit(pTree, "dummyId", limit));
       
       // the tree rooted at 600 is never over limit.
-      pTree = new ProcfsBasedProcessTree("600", true, 100L,
+      pTree = new ProcfsBasedProcessTree("600", true,
                                             procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertFalse("tree rooted at 600 should never be over limit.",

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jun  5 02:33:44 2012
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
@@ -64,7 +65,7 @@ public class TestTrackerDistributedCache
     String execPath = path + "/task-controller";
     ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(new LocalDirAllocator("mapred.local.dir"));
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java Tue Jun  5 02:33:44 2012
@@ -27,7 +27,8 @@ import org.apache.hadoop.mapred.UtilsFor
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.junit.Assert.*;
 
@@ -47,12 +48,18 @@ public class TestUserLogCleanup {
   private JobID jobid4 = new JobID(jtid, 4);
   private File foo = new File(TaskLog.getUserLogDir(), "foo");
   private File bar = new File(TaskLog.getUserLogDir(), "bar");
+  private TaskController taskController;
 
   public TestUserLogCleanup() throws IOException {
     Configuration conf = new Configuration();
-    localizer = new Localizer(FileSystem.get(conf), conf
-        .getStrings(MRConfig.LOCAL_DIR), new DefaultTaskController());
-    taskLogCleanupThread = new UserLogCleaner(conf);
+    localizer =
+      new Localizer(FileSystem.get(conf), conf.getStrings(MRConfig.LOCAL_DIR));
+    Class<? extends TaskController> taskControllerClass =
+      conf.getClass("mapred.task.tracker.task-controller",
+                     DefaultTaskController.class, TaskController.class);
+    taskController = 
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+    taskLogCleanupThread = new UserLogCleaner(conf, taskController);
     taskLogCleanupThread.setClock(myClock);
     tt = new TaskTracker();
     tt.setConf(new JobConf(conf));
@@ -66,11 +73,10 @@ public class TestUserLogCleanup {
   }
 
   private File localizeJob(JobID jobid) throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    new JobLocalizer(tt.getJobConf(), user, 
+                     jobid.toString()).initializeJobLogDir();
     File jobUserlog = TaskLog.getJobDir(jobid);
-
-    JobConf conf = new JobConf();
-    // localize job log directory
-    tt.initializeJobLogDir(jobid, conf);
     assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
     return jobUserlog;
   }
@@ -103,16 +109,17 @@ public class TestUserLogCleanup {
 
     // add the job for deletion with one hour as retain hours
     jobFinished(jobid2, 1);
-
     // remove old logs and see jobid1 is not removed and jobid2 is removed
     myClock.advance(ONE_HOUR);
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog2);
+      
     assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
-    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
-
+    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists()); 
     myClock.advance(ONE_HOUR);
     // remove old logs and see jobid1 is removed now
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1);
     assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
   }
 
@@ -151,18 +158,18 @@ public class TestUserLogCleanup {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3);
     taskLogCleanupThread.clearOldUserLogs(conf);
+    retry(foo, bar);
     assertFalse(foo.exists());
     assertFalse(bar.exists());
     assertTrue(jobUserlog1.exists());
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
     assertTrue(jobUserlog4.exists());
-    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
-        .exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 2.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1);
     assertFalse(jobUserlog1.exists());
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
@@ -175,29 +182,31 @@ public class TestUserLogCleanup {
     jobFinished(jobid3, 3);
 
     // mimic localizeJob for jobid4
-    jobUserlog4 = localizeJob(jobid4);
+    //jobUserlog4 = localizeJob(jobid4);
 
     // do cleanup
     myClock.advance(2 * ONE_HOUR);
     // time is now 4.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog4);
 
     // jobid2 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 5.
     // do cleanup again
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog3);
 
     // jobid3 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertFalse(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
   }
 
   /**
@@ -232,7 +241,7 @@ public class TestUserLogCleanup {
     // job directories will be added with 3 hours as retain hours. 
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3);
-    taskLogCleanupThread = new UserLogCleaner(conf);
+    taskLogCleanupThread = new UserLogCleaner(conf, taskController);
     myClock = new FakeClock(); // clock is reset.
     taskLogCleanupThread.setClock(myClock);
     taskLogCleanupThread.clearOldUserLogs(conf);
@@ -243,8 +252,6 @@ public class TestUserLogCleanup {
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
     assertTrue(jobUserlog4.exists());
-    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
-        .exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 1.
@@ -267,22 +274,42 @@ public class TestUserLogCleanup {
     myClock.advance(2 * ONE_HOUR);
     // time is now 3.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog4);
 
     // jobid1 and jobid2 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 4.
     // do cleanup again
     taskLogCleanupThread.processCompletedJobs();
-
+    retry(jobUserlog1, jobUserlog2, jobUserlog3, jobUserlog4);
+    
     // jobid3 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertFalse(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
+  }
+  
+  private void retry(File... jobDirs) {
+    //since the deletion is done by a thread, we poll for sometime
+    short retries = 0;
+    while (retries++ < 20) {
+      boolean exist = false;
+      for (File dir : jobDirs) {
+        if (dir.exists()) {
+          exist = true;
+        }
+      }
+      if (exist) {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie){}
+      } else return;
+    }
   }
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jun  5 02:33:44 2012
@@ -47,6 +47,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -661,7 +662,7 @@ public class UtilsForTests {
    * asynchronously.
    */
   public static class InlineCleanupQueue extends CleanupQueue {
-    List<String> stalePaths = new ArrayList<String>();
+    List<Path> stalePaths = new ArrayList<Path>();
 
     public InlineCleanupQueue() {
       // do nothing
@@ -671,19 +672,37 @@ public class UtilsForTests {
     public void addToQueue(PathDeletionContext... contexts) {
       // delete paths in-line
       for (PathDeletionContext context : contexts) {
+        Exception exc = null;
         try {
           if (!deletePath(context)) {
             LOG.warn("Stale path " + context.fullPath);
             stalePaths.add(context.fullPath);
           }
         } catch (IOException e) {
+          exc = e;
+        } catch (InterruptedException ie) {
+          exc = ie;
+        }
+        if (exc != null) {
           LOG.warn("Caught exception while deleting path "
               + context.fullPath);
-          LOG.info(StringUtils.stringifyException(e));
+          LOG.info(StringUtils.stringifyException(exc));
           stalePaths.add(context.fullPath);
         }
       }
     }
+    static boolean deletePath(PathDeletionContext context) 
+    throws IOException, InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to delete " + context.fullPath);
+      }
+//      FileSystem fs = context.fullPath.getFileSystem(context.conf);
+//      if (fs.exists(context.fullPath)) {
+//        return fs.delete(context.fullPath, true);
+//      }
+      context.deletePath();
+      return true;
+    }
   }
   
   static class FakeClock extends Clock {

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Jun  5 02:33:44 2012
@@ -32,8 +32,12 @@ import javax.security.auth.login.LoginEx
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobLocalizer;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapreduce.Cluster;
@@ -41,6 +45,9 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -54,10 +61,10 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.mortbay.log.Log;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
-
+  private static final Log LOG =
+    LogFactory.getLog(TestTrackerDistributedCacheManager.class);
   protected String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp"),
           TestTrackerDistributedCacheManager.class.getSimpleName())
@@ -68,10 +75,12 @@ public class TestTrackerDistributedCache
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
-  private static final int LOCAL_CACHE_SUBDIR = 2;
+  private static final int LOCAL_CACHE_SUBDIR = 1;
   protected Configuration conf;
   protected Path firstCacheFile;
+  protected Path firstCacheFilePublic;
   protected Path secondCacheFile;
+  protected Path secondCacheFilePublic;
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
@@ -119,18 +128,22 @@ public class TestTrackerDistributedCache
         taskControllerClass, conf);
 
     // setup permissions for mapred local dir
-    taskController.setup();
+    taskController.setup(localDirAllocator);
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne");
+    secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne");
+    createPublicTempFile(firstCacheFilePublic);
+    createPublicTempFile(secondCacheFilePublic);
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
   }
   
   protected void refreshConf(Configuration conf) throws IOException {
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(localDirAllocator);
   }
 
   /**
@@ -148,7 +161,8 @@ public class TestTrackerDistributedCache
    * @throws IOException
    * @throws LoginException
    */
-  public void testManagerFlow() throws IOException, LoginException {
+  public void testManagerFlow()
+      throws IOException, LoginException, InterruptedException {
     if (!canRun()) {
       return;
     }
@@ -158,6 +172,7 @@ public class TestTrackerDistributedCache
     Configuration subConf = new Configuration(conf);
     String userName = getJobOwnerName();
     subConf.set(MRJobConfig.USER_NAME, userName);
+    JobID jobid = new JobID("jt",1);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
@@ -171,15 +186,18 @@ public class TestTrackerDistributedCache
 
     // ****** Imitate TaskRunner code.
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(jobid, subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
-    // ****** End of imitating TaskRunner code
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf);
+    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
+//    // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
@@ -208,18 +226,21 @@ public class TestTrackerDistributedCache
       TrackerDistributedCacheManager {
     public FakeTrackerDistributedCacheManager(Configuration conf)
         throws IOException {
-      super(conf, taskController);
+      super(conf);
     }
 
     @Override
-    Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, boolean isArchive, boolean isPublic)
-    throws IOException {
-      if (cache.equals(firstCacheFile.toUri())) {
+    Path localizePublicCacheObject(Configuration conf, URI cache, 
+        long confFileStamp,
+        CacheStatus cacheStatus, 
+        FileStatus fileStatus, 
+        boolean isArchive) throws IOException, InterruptedException {
+      if (cache.equals(firstCacheFilePublic.toUri())) {
         throw new IOException("fake fail");
       }
-      return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          isArchive, isPublic);
+      return super.localizePublicCacheObject(conf, cache, confFileStamp, 
+          cacheStatus, fileStatus, 
+          isArchive);
     }
   }
 
@@ -230,57 +251,58 @@ public class TestTrackerDistributedCache
     }
     TrackerDistributedCacheManager manager = 
       new FakeTrackerDistributedCacheManager(conf);
-    Cluster cluster = new Cluster(conf);
+
     String userName = getJobOwnerName();
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
 
     // Configures a job with a regular file
-    Job job1 = Job.getInstance(cluster, conf);
-    job1.setUser(userName);
-    job1.addCacheFile(secondCacheFile.toUri());
+    Job job1 = new Job(conf);
     Configuration conf1 = job1.getConfiguration();
+    conf1.set("user.name", userName);
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
+    
     TrackerDistributedCacheManager.determineTimestamps(conf1);
     TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
-      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
-          c.owner));
+      assertEquals(0, manager.getReferenceCount(c.getStatus()));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
-    Job job2 = Job.getInstance(cluster, conf);
-    job2.setUser(userName);
+    Job job2 = new Job(conf);
+    Configuration conf2 = job2.getConfiguration();
+    conf2.set("user.name", userName);
     // add a file that would get failed to localize
-    job2.addCacheFile(firstCacheFile.toUri());
+    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
     // add a file that is already localized by different job
-    job2.addCacheFile(secondCacheFile.toUri());
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
     // add a file that is never localized
-    job2.addCacheFile(thirdCacheFile.toUri());
-    Configuration conf2 = job2.getConfiguration();
+    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+    
     TrackerDistributedCacheManager.determineTimestamps(conf2);
     TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
 
     // Task localizing for second job
     // localization for the "firstCacheFile" will fail.
-    handle = manager.newTaskDistributedCacheManager(conf2);
+    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+      JobLocalizer.downloadPrivateCache(conf2);
     } catch (IOException e) {
       th = e;
-      Log.info("Exception during setup", e);
+      LOG.info("Exception during setup", e);
     }
     assertNotNull(th);
     assertTrue(th.getMessage().contains("fake fail"));
@@ -288,15 +310,15 @@ public class TestTrackerDistributedCache
     th = null;
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
-        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
-            c.owner));
-      } catch (IOException ie) {
+        int refcount = manager.getReferenceCount(c.getStatus());
+        LOG.info("checking refcount " + c.uri + " of " + refcount);
+        assertEquals(0, refcount);
+      } catch (NullPointerException ie) {
         th = ie;
-        Log.info("Exception getting reference count for " + c.uri, ie);
+        LOG.info("Exception getting reference count for " + c.uri, ie);
       }
     }
     assertNotNull(th);
-    assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
     fs.delete(thirdCacheFile, false);
   }
   
@@ -361,7 +383,7 @@ public class TestTrackerDistributedCache
   private Path checkLocalizedPath(boolean visibility) 
   throws IOException, LoginException, InterruptedException {
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     Cluster cluster = new Cluster(conf);
     String userName = getJobOwnerName();
     File workDir = new File(TEST_ROOT_DIR, "workdir");
@@ -381,10 +403,10 @@ public class TestTrackerDistributedCache
 
     // Task localizing for job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
     String distCacheDir;
     if (visibility) {
@@ -394,9 +416,8 @@ public class TestTrackerDistributedCache
     }
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
-          fs.getFileStatus(cacheFile), false,
-          c.timestamp, new Path(TEST_ROOT_DIR), false,
-          visibility);
+                            fs.getFileStatus(cacheFile), false,
+    		                c.timestamp, visibility, c);
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
@@ -504,56 +525,94 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
     conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf2, taskController);
+        new TrackerDistributedCacheManager(conf2);
     manager.startCleanupThread();
     FileSystem localfs = FileSystem.getLocal(conf2);
     String userName = getJobOwnerName();
     conf2.set(MRJobConfig.USER_NAME, userName);
 
     // We first test the size limit
-    Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+    FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
+    CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
+    Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
-    manager.releaseCache(firstCacheFile.toUri(), conf2,
-        getFileStamp(firstCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        cfile1);
+    manager.releaseCache(cfile1.getStatus());
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+    stat = fs.getFileStatus(secondCacheFilePublic);
+    CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(firstLocalCache));
+    Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(secondCacheFile), false, 
-        getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
-    checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
+        fs.getFileStatus(secondCacheFilePublic), false, 
+        fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+        cfile2);
+    checkCacheDeletion(localfs, firstLocalCache, "DistributedCache failed " +
         "deleting old cache when the cache store is full.");
+    manager.stopCleanupThread();
     // Now we test the number of sub directories limit
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT * 10);
+    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
+    manager = 
+      new TrackerDistributedCacheManager(conf2);
+    manager.startCleanupThread();
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
     // Adding two more small files, so it triggers the number of sub directory
     // limit but does not trigger the file size limit.
-    createTempFile(thirdCacheFile, 1);
-    createTempFile(fourthCacheFile, 1);
+    createPrivateTempFile(thirdCacheFile);
+    createPrivateTempFile(fourthCacheFile);
+    DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    stat = fs.getFileStatus(thirdCacheFile);
+    CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
     Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
-        getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(thirdCacheFile).getModificationTime(), 
+        false, cfile3);
+    DistributedCache.setLocalFiles(conf2, thirdLocalCache.toString());
+    JobLocalizer.downloadPrivateCache(conf2);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2,
-        getFileStamp(thirdCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+    manager.releaseCache(cfile3.getStatus());
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
-    manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
+    stat = fs.getFileStatus(fourthCacheFile);
+    CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(thirdLocalCache));
+    DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
+    DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
     checkCacheDeletion(localfs, thirdLocalCache,
         "DistributedCache failed deleting old" +
         " cache when the cache exceeds the number of sub directories limit.");
@@ -587,17 +646,20 @@ public class TestTrackerDistributedCache
       return;
     }
     TrackerDistributedCacheManager manager =
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
     String userName = getJobOwnerName();
     conf.set(MRJobConfig.USER_NAME, userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
+    CacheFile file = new CacheFile(fileToCache.toUri(), 
+    		                       CacheFile.FileType.REGULAR, 
+    		                       false, 0, false);
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile),
-        new Path(TEST_ROOT_DIR), false, false);
+        System.currentTimeMillis(),
+        false, file);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -632,6 +694,8 @@ public class TestTrackerDistributedCache
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
+    new File(firstCacheFilePublic.toString()).delete();
+    new File(secondCacheFilePublic.toString()).delete();
     FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }
 
@@ -652,9 +716,13 @@ public class TestTrackerDistributedCache
     }
     
     public FileStatus getFileStatus(Path p) throws IOException {
+      FileStatus rawFileStatus = super.getFileStatus(p);
       File f = pathToFile(p);
-      return new FileStatus(f.length(), f.isDirectory(), 1, 128,
-      f.lastModified() + increment, makeQualified(new Path(f.getPath())));
+      FileStatus status = new FileStatus(f.length(), f.isDirectory(), 1, 128,
+             f.lastModified() + increment, 0,
+             rawFileStatus.getPermission(), rawFileStatus.getOwner(), 
+             rawFileStatus.getGroup(), makeQualified(new Path(f.getPath())));
+      return status;
     }
     
     void advanceClock(long millis) {
@@ -672,7 +740,7 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
 
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(myConf, taskController);
+      new TrackerDistributedCacheManager(myConf);
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(myConf);
@@ -684,14 +752,16 @@ public class TestTrackerDistributedCache
 
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    //TODO this doesn't really happen in the TaskRunner
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
     // ****** End of imitating TaskRunner code
-
+    JobLocalizer.downloadPrivateCache(subConf);
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
     assertEquals(1, localCacheFiles.length);
@@ -709,8 +779,10 @@ public class TestTrackerDistributedCache
     // running a task of the same job
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+//      handle.setupPrivateCache(localDirAllocator, TaskTracker
+//          .getPrivateDistributedCacheDir(userName));
     } catch (IOException ie) {
       th = ie;
     }
@@ -723,21 +795,22 @@ public class TestTrackerDistributedCache
     // running a task of the same job on another TaskTracker which has never
     // initialized the cache
     TrackerDistributedCacheManager manager2 = 
-      new TrackerDistributedCacheManager(myConf, taskController);
+      new TrackerDistributedCacheManager(myConf);
     TaskDistributedCacheManager handle2 =
-      manager2.newTaskDistributedCacheManager(subConf);
+      manager2.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
     File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
     th = null;
     try {
-      handle2.setup(localDirAllocator, workDir2, TaskTracker
+      handle2.setupCache(subConf, TaskTracker
           .getPrivateDistributedCacheDir(userName), 
           TaskTracker.getPublicDistributedCacheDir());
+      JobLocalizer.downloadPrivateCache(subConf);
     } catch (IOException ie) {
       th = ie;
     }
     assertNotNull("Throwable is null", th);
     assertTrue("Exception message does not match",
-        th.getMessage().contains("has changed on HDFS since job started"));
+        th.getMessage().contains("changed during the job from"));
     // release
     handle.release();
     
@@ -749,9 +822,10 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
     
     handle =
-      manager.newTaskDistributedCacheManager(subConf2);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
+    handle.setupCache(subConf2, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf2);
     Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
     assertNotNull(null, localCacheFiles2);
     assertEquals(1, localCacheFiles2.length);
@@ -781,26 +855,36 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
     conf.set(MRJobConfig.USER_NAME, userName);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf, taskController);
+        new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
 
     Path[] localCache = new Path[2];
-    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+    FileStatus stat = fs.getFileStatus(firstCacheFile);
+    CacheFile file = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                       CacheFile.FileType.REGULAR, true, 
+    		                       stat.getModificationTime(), false);
+    localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        file);
     FsPermission myPermission = new FsPermission((short)0600);
     Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
     if (FileSystem.create(localfs, myFile, myPermission) == null) {
       throw new IOException("Could not create " + myFile);
     }
     try {
-      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+      stat = fs.getFileStatus(secondCacheFilePublic);
+      file = new CacheFile(secondCacheFilePublic.toUri(),
+    		               CacheFile.FileType.REGULAR,
+    		               true, stat.getModificationTime(), false);
+      localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf, 
           TaskTracker.getPrivateDistributedCacheDir(userName),
-          fs.getFileStatus(secondCacheFile), false, 
-          getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
-          false);
-      FileStatus stat = localfs.getFileStatus(myFile);
+          fs.getFileStatus(secondCacheFilePublic), false, 
+          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+          file);
+      stat = localfs.getFileStatus(myFile);
       assertTrue(stat.getPermission().equals(myPermission));
       // validate permissions of localized files.
       checkFilePermissions(localCache);

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java Tue Jun  5 02:33:44 2012
@@ -35,9 +35,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 import junit.framework.TestCase;
 
@@ -60,7 +63,7 @@ public class TestProcfsBasedProcessTree 
     public void run() {
       try {
         Vector<String> args = new Vector<String>();
-        if(ProcessTree.isSetsidAvailable) {
+        if(TaskController.isSetsidAvailable) {
           args.add("setsid");
         }
         args.add("bash");
@@ -95,7 +98,7 @@ public class TestProcfsBasedProcessTree 
     return getPidFromPidFile(pidFile);
   }
 
-  public void testProcessTree() {
+  public void testProcessTree() throws Exception {
 
     try {
       if (!ProcfsBasedProcessTree.isAvailable()) {
@@ -149,8 +152,7 @@ public class TestProcfsBasedProcessTree 
     String pid = getRogueTaskPID();
     LOG.info("Root process pid: " + pid);
     ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
-                               ProcessTree.isSetsidAvailable,
-                               ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+                               TaskController.isSetsidAvailable);
     p = p.getProcessTree(); // initialize
     LOG.info("ProcessTree: " + p.toString());
 
@@ -171,13 +173,14 @@ public class TestProcfsBasedProcessTree 
     String processTreeDump = p.getProcessTreeDump();
 
     // destroy the process and all its subprocesses
-    p.destroy(true/*in the background*/);
+    TaskController tc = new DefaultTaskController();
+    tc.signalTask(null, Integer.valueOf(pid), Signal.KILL);
 
-    if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
-      assertEquals(false, p.isAnyProcessInTreeAlive());
-    }
-    else {// process should be gone
-      assertFalse("ProcessTree must have been gone", p.isAlive());
+    if (TaskController.isSetsidAvailable) { // whole processtree should be gone
+      assertFalse("Proceesses in process group live",
+          p.isAnyProcessInTreeAlive(tc));
+    } else {// process should be gone
+      assertFalse("ProcessTree must have been gone", p.isAlive(tc));
     }
 
     LOG.info("Process-tree dump follows: \n" + processTreeDump);
@@ -204,7 +207,7 @@ public class TestProcfsBasedProcessTree 
 
     // ProcessTree is gone now. Any further calls should be sane.
     p = p.getProcessTree();
-    assertFalse("ProcessTree must have been gone", p.isAlive());
+    assertFalse("ProcessTree must have been gone", p.isAlive(tc));
     assertTrue("Cumulative vmem for the gone-process is "
         + p.getCumulativeVmem() + " . It should be zero.", p
         .getCumulativeVmem() == 0);
@@ -333,8 +336,8 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = 
-          new ProcfsBasedProcessTree("100", true, 100L, 
-                                  procfsRootDir.getAbsolutePath());
+          new ProcfsBasedProcessTree("100", true,
+              procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
       
@@ -406,8 +409,8 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = 
-          new ProcfsBasedProcessTree("100", true, 100L, 
-                                  procfsRootDir.getAbsolutePath());
+          new ProcfsBasedProcessTree("100", true,
+              procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
       
@@ -498,11 +501,11 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
-                        pid, true, 100L, procfsRootDir.getAbsolutePath());
+                        pid, true, procfsRootDir.getAbsolutePath());
 
       // Let us not create stat file for pid 100.
       assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
-                            pid, procfsRootDir.getAbsolutePath()));
+            Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -551,9 +554,8 @@ public class TestProcfsBasedProcessTree 
       writeStatFiles(procfsRootDir, pids, procInfos);
       writeCmdLineFiles(procfsRootDir, pids, cmdLines);
 
-      ProcfsBasedProcessTree processTree =
-          new ProcfsBasedProcessTree("100", true, 100L, procfsRootDir
-              .getAbsolutePath());
+      ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
+          "100", true, procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/testshell/ExternalMapReduce.java Tue Jun  5 02:33:44 2012
@@ -18,6 +18,7 @@
 
 package testshell;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -91,6 +92,10 @@ public class ExternalMapReduce extends C
       if (ret != 0) {
         throw new IOException("files_tmp does not exist");
       }
+      File file = new File("./ziplink/test.txt");
+      if (!file.canExecute()) {
+        throw new IOException("ziplink/test.txt is not executable");
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/unit/org/apache/hadoop/mapred/TestTaskTrackerDirectories.java Tue Jun  5 02:33:44 2012
@@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.junit.Test;
 import org.junit.Before;
+import org.mockito.Mockito;
 
 /**
  * Tests for the correct behavior of the TaskTracker starting up with
@@ -55,8 +56,8 @@ public class TestTaskTrackerDirectories 
         TEST_DIR + "/local2"
     };
     
-    conf.setStrings(MRConfig.LOCAL_DIR, dirs);
-    setupTaskController(conf);
+    conf.setStrings("mapred.local.dir", dirs);
+    setupTaskTracker(conf);
 
     for (String dir : dirs) {
       checkDir(dir);
@@ -73,8 +74,8 @@ public class TestTaskTrackerDirectories 
     new File(dirs[0]).mkdirs();
     FileUtil.chmod(dirs[0], "000");
 
-    conf.setStrings(MRConfig.LOCAL_DIR, dirs);
-    setupTaskController(conf);
+    conf.setStrings("mapred.local.dir", dirs);
+    setupTaskTracker(conf);
     
     for (String dir : dirs) {
       checkDir(dir);
@@ -86,7 +87,7 @@ public class TestTaskTrackerDirectories 
     File dir = TaskLog.getUserLogDir();
     FileUtil.fullyDelete(dir);
     
-    setupTaskController(new Configuration());
+    setupTaskTracker(new Configuration());
     
     checkDir(dir.getAbsolutePath());
   }
@@ -103,7 +104,7 @@ public class TestTaskTrackerDirectories 
         dir.createNewFile());
 
     try {
-      setupTaskController(new Configuration());
+      setupTaskTracker(new Configuration());
       fail("Didn't throw!");
     } catch (IOException ioe) {
       System.err.println("Got expected exception");
@@ -118,15 +119,20 @@ public class TestTaskTrackerDirectories 
     dir.mkdirs();
     FileUtil.chmod(dir.getAbsolutePath(), "000");
     
-    setupTaskController(new Configuration());
+    setupTaskTracker(new Configuration());
     
     checkDir(dir.getAbsolutePath());
   }
   
-  private void setupTaskController(Configuration conf) throws IOException {
-    TaskController tc = new DefaultTaskController();
-    tc.setConf(conf);
-    tc.setup();
+  private void setupTaskTracker(Configuration conf) throws Exception {
+    JobConf ttConf = new JobConf(conf);
+    // Doesn't matter what we give here - we won't actually
+    // connect to it.
+    TaskTracker tt = new TaskTracker();
+    tt.setConf(ttConf);
+    tt.setTaskController(Mockito.mock(TaskController.class));
+    fail("TODO: update this test case after 2178");
+    // tt.initializeDirectories();
   }
 
   private void checkDir(String dir) throws IOException {



Mime
View raw message