hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079211 [11/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/fairscheduler/designdoc/ src/contrib/streaming/...
Date Tue, 08 Mar 2011 05:56:31 GMT
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java Tue Mar  8 05:56:27 2011
@@ -24,12 +24,19 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Vector;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import static org.junit.Assert.*;
 import org.junit.Before;
@@ -43,6 +50,8 @@ public class TestJvmManager {
   private TaskTracker tt;
   private JvmManager jvmManager;
   private JobConf ttConf;
+  private boolean threadCaughtException = false;
+  private String user;
 
   @Before
   public void setUp() {
@@ -55,15 +64,23 @@ public class TestJvmManager {
   }
 
   public TestJvmManager() throws Exception {
+    user = UserGroupInformation.getCurrentUser().getShortUserName();
     tt = new TaskTracker();
     ttConf = new JobConf();
     ttConf.setLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, 2000);
     tt.setConf(ttConf);
     tt.setMaxMapSlots(MAP_SLOTS);
     tt.setMaxReduceSlots(REDUCE_SLOTS);
-    tt.setTaskController(new DefaultTaskController());
+    TaskController dtc;
+    tt.setTaskController((dtc = new DefaultTaskController()));
+    Configuration conf = new Configuration();
+    dtc.setConf(conf);
+    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+    tt.getTaskController().setup(ldirAlloc);
+    JobID jobId = new JobID("test", 0);
     jvmManager = new JvmManager(tt);
     tt.setJvmManagerInstance(jvmManager);
+    tt.setCleanupThread(new InlineCleanupQueue());
   }
 
   // write a shell script to execute the command.
@@ -99,15 +116,21 @@ public class TestJvmManager {
     JobConf taskConf = new JobConf(ttConf);
     TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, 0, 0);
     Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
     File pidFile = new File(TEST_DIR, "pid");
-    final TaskRunner taskRunner = task.createRunner(tt, tip);
+    RunningJob rjob = new RunningJob(attemptID.getJobID());
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(ttConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(ttConf).
+      newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+    final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
     // launch a jvm which sleeps for 60 seconds
     final Vector<String> vargs = new Vector<String>(2);
     vargs.add(writeScript("SLEEP", "sleep 60\n", pidFile).getAbsolutePath());
     final File workDir = new File(TEST_DIR, "work");
-    workDir.mkdir();
     final File stdout = new File(TEST_DIR, "stdout");
     final File stderr = new File(TEST_DIR, "stderr");
 
@@ -116,10 +139,13 @@ public class TestJvmManager {
       public void run() {
         try {
           taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
-              workDir, null);
+              workDir);
         } catch (InterruptedException e) {
           e.printStackTrace();
           return;
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
         }
       }
     };
@@ -147,7 +173,14 @@ public class TestJvmManager {
     final JvmRunner jvmRunner = mapJvmManager.jvmIdToRunner.get(jvmid);
     Thread killer = new Thread() {
       public void run() {
-        jvmRunner.kill();
+        try {
+          jvmRunner.kill();
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
       }
     };
     killer.start();
@@ -163,21 +196,25 @@ public class TestJvmManager {
     // launch another jvm and see it finishes properly
     attemptID = new TaskAttemptID("test", 0, TaskType.MAP, 0, 1);
     task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     tip = tt.new TaskInProgress(task, taskConf);
-    TaskRunner taskRunner2 = task.createRunner(tt, tip);
+    TaskRunner taskRunner2 = task.createRunner(tt, tip, rjob);
     // build dummy vargs to call ls
     Vector<String> vargs2 = new Vector<String>(1);
     vargs2.add(writeScript("LS", "ls", pidFile).getAbsolutePath());
     File workDir2 = new File(TEST_DIR, "work2");
-    workDir.mkdir();
     File stdout2 = new File(TEST_DIR, "stdout2");
     File stderr2 = new File(TEST_DIR, "stderr2");
-    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2,
-        null);
+    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2);
     // join all the threads
     killer.join();
     jvmRunner.join();
     launcher.join();
+    assertFalse("Thread caught unexpected IOException", 
+                 threadCaughtException);
+  }
+  private void setThreadCaughtException() {
+    threadCaughtException = true;
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Tue Mar  8 05:56:27 2011
@@ -33,10 +33,10 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
 
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -151,6 +151,8 @@ public class TestKillSubProcesses extend
         break;
       }
     }
+    final TaskController tc =
+      mr.getTaskTrackerRunner(0).getTaskTracker().getTaskController();
 
     pid = null;
     jobClient = new JobClient(conf);
@@ -195,7 +197,7 @@ public class TestKillSubProcesses extend
     }
 
     // Checking if the descendant processes of map task are alive
-    if(ProcessTree.isSetsidAvailable) {
+    if(TaskController.isSetsidAvailable) {
       String childPid = TestProcfsBasedProcessTree.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + 0);
       while(childPid == null) {
@@ -243,11 +245,11 @@ public class TestKillSubProcesses extend
     }
 
     // Checking if the map task got killed or not
-    assertTrue(!ProcessTree.isAlive(pid));
+    assertTrue(!isAlive(pid));
     LOG.info("The map task is not alive after Job is completed, as expected.");
 
     // Checking if the descendant processes of map task are killed properly
-    if(ProcessTree.isSetsidAvailable) {
+    if(TaskController.isSetsidAvailable) {
       for(int i=0; i <= numLevelsOfSubProcesses; i++) {
         String childPid = TestProcfsBasedProcessTree.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + i);
@@ -310,9 +312,10 @@ public class TestKillSubProcesses extend
       return;
     }
     
-    JobConf conf=null;
     try {
-      mr = new MiniMRCluster(1, "file:///", 1);
+      JobConf conf = new JobConf();
+      conf.setLong(JvmManager.JvmManagerForType.DELAY_BEFORE_KILL_KEY, 0L);
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
 
       // run the TCs
       conf = mr.createJobConf();
@@ -354,7 +357,7 @@ public class TestKillSubProcesses extend
    * Runs a recursive shell script to create a chain of subprocesses
    */
   private static void runChildren(JobConf conf) throws IOException {
-    if (ProcessTree.isSetsidAvailable) {
+    if (TaskController.isSetsidAvailable) {
       FileSystem fs = FileSystem.getLocal(conf);
 
       if (fs.exists(scriptDir)) {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java Tue Mar  8 05:56:27 2011
@@ -22,22 +22,27 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
+
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.hadoop.mapred.LinuxTaskController.ResultCode.*;
+
 import junit.framework.TestCase;
 
+@Ignore("Negative test relies on properties fixed during TC compilation")
 public class TestLinuxTaskController extends TestCase {
-  private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24;
   private static File testDir = new File(System.getProperty("test.build.data",
       "/tmp"), TestLinuxTaskController.class.getName());
-  private static String taskControllerPath = System
-      .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+  private static String taskControllerPath =
+    System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
 
   @Before
   protected void setUp() throws Exception {
@@ -51,9 +56,8 @@ public class TestLinuxTaskController ext
 
   public static class MyLinuxTaskController extends LinuxTaskController {
     String taskControllerExePath = taskControllerPath + "/task-controller";
-
     @Override
-    protected String getTaskControllerExecutablePath() {
+    protected String getTaskControllerExecutablePath(Configuration conf) {
       return taskControllerExePath;
     }
   }
@@ -64,16 +68,18 @@ public class TestLinuxTaskController ext
       // task controller setup should fail validating permissions.
       Throwable th = null;
       try {
-        controller.setup();
+        controller.setup(new LocalDirAllocator("mapred.local.dir"));
       } catch (IOException ie) {
         th = ie;
       }
       assertNotNull("No exception during setup", th);
-      assertTrue("Exception message does not contain exit code"
-          + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
-          "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
+      assertTrue("Exception message \"" + th.getMessage() +
+            "\" does not contain exit code " +
+            INVALID_TASKCONTROLLER_PERMISSIONS.getValue(),
+          th.getMessage().contains(
+            "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS.getValue()));
     } else {
-      controller.setup();
+      controller.setup(new LocalDirAllocator("mapred.local.dir"));
     }
 
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Tue Mar  8 05:56:27 2011
@@ -45,8 +45,10 @@ import org.apache.hadoop.mapred.lib.Iden
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
 
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -85,6 +87,17 @@ import static org.junit.Assert.assertFal
  *
  **********************************************************/
 public class TestMapRed extends Configured implements Tool {
+
+  static final Path TESTDIR =
+    new Path(System.getProperty("test.build.data", "/tmp"),
+        TestMapRed.class.getSimpleName());
+
+  @Before
+  public void removeTestdir() throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(new Configuration()).getRaw();
+    rfs.delete(TESTDIR, true);
+  }
+
   /**
    * Modified to make it a junit test.
    * The RandomGen Job does the actual work of creating
@@ -370,7 +383,8 @@ public class TestMapRed extends Configur
                                 boolean includeCombine
                                 ) throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
-    Path testdir = new Path("build/test/test.mapred.compress");
+    Path testdir = new Path(System.getProperty("test.build.data", "/tmp"),
+        "test.mapred.compress");
     Path inDir = new Path(testdir, "in");
     Path outDir = new Path(testdir, "out");
     FileSystem fs = FileSystem.get(conf);
@@ -461,7 +475,8 @@ public class TestMapRed extends Configur
     // Write the answer key to a file.  
     //
     FileSystem fs = FileSystem.get(conf);
-    Path testdir = new Path("mapred.loadtest");
+    final Path testdir = new Path(System.getProperty("test.build.data", "/tmp"),
+        "mapred.loadtest");
     if (!fs.mkdirs(testdir)) {
       throw new IOException("Mkdirs failed to create " + testdir.toString());
     }
@@ -723,7 +738,8 @@ public class TestMapRed extends Configur
   public void runJob(int items) {
     try {
       JobConf conf = new JobConf(TestMapRed.class);
-      Path testdir = new Path("build/test/test.mapred.spill");
+      Path testdir = new Path(System.getProperty("build.test.data", "/tmp"),
+          "test.mapred.spill");
       Path inDir = new Path(testdir, "in");
       Path outDir = new Path(testdir, "out");
       FileSystem fs = FileSystem.get(conf);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Mar  8 05:56:27 2011
@@ -135,7 +135,6 @@ public class TestMiniMRWithDFS extends T
           .isDirectory());
       LOG.info("Verifying contents of " + MRConfig.LOCAL_DIR + " "
           + localDir.getAbsolutePath());
-
       // Verify contents(user-dir) of tracker-sub-dir
       File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
       if (trackerSubDir.isDirectory()) {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Tue Mar  8 05:56:27 2011
@@ -20,7 +20,10 @@ package org.apache.hadoop.mapred;
 
 import java.io.*;
 import java.util.*;
-import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import org.apache.commons.logging.*;
 
@@ -28,17 +31,26 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 
-public class TestSequenceFileInputFormat extends TestCase {
+public class TestSequenceFileInputFormat {
   private static final Log LOG = FileInputFormat.LOG;
 
   private static int MAX_LENGTH = 10000;
   private static Configuration conf = new Configuration();
+  static final Path TESTDIR =
+    new Path(System.getProperty("test.build.data", "/tmp"),
+        TestSequenceFileInputFormat.class.getSimpleName());
+
+  @Before
+  public void removeTestdir() throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(new Configuration()).getRaw();
+    rfs.delete(TESTDIR, true);
+  }
 
+  @Test
   public void testFormat() throws Exception {
     JobConf job = new JobConf(conf);
     FileSystem fs = FileSystem.getLocal(conf);
-    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
-    Path file = new Path(dir, "test.seq");
+    Path file = new Path(TESTDIR, "test.seq").makeQualified(fs);
     
     Reporter reporter = Reporter.NULL;
     
@@ -46,9 +58,7 @@ public class TestSequenceFileInputFormat
     //LOG.info("seed = "+seed);
     Random random = new Random(seed);
 
-    fs.delete(dir, true);
-
-    FileInputFormat.setInputPaths(job, dir);
+    FileInputFormat.setInputPaths(job, TESTDIR);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
@@ -108,6 +118,7 @@ public class TestSequenceFileInputFormat
         assertEquals("Some keys in no partition.", length, bits.cardinality());
       }
 
+      fs.delete(TESTDIR, true);
     }
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java Tue Mar  8 05:56:27 2011
@@ -161,6 +161,13 @@ public class TestTaskCommit extends Hado
         throws IOException {
       return 0;
     }
+
+    @Override
+    public void 
+    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                       long[] sizes) throws IOException {
+      // NOTHING
+    }
   }
   
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Mar  8 05:56:27 2011
@@ -34,10 +34,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -47,20 +48,20 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
 import junit.framework.TestCase;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.junit.Ignore;
 
 /**
  * Test to verify localization of a job and localization of a task on a
  * TaskTracker.
  * 
  */
+@Ignore // test relies on deprecated functionality/lifecycle
 public class TestTaskTrackerLocalization extends TestCase {
 
   static { DefaultMetricsSystem.setMiniClusterMode(true); }
@@ -184,7 +185,14 @@ public class TestTaskTrackerLocalization
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
+    // setup task controller
+    taskController = createTaskController();
+    taskController.setConf(trackerFConf);
+    taskController.setup(lDirAlloc);
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
+    tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf, 
+                                        taskController));
     initializeTracker();
   }
 
@@ -206,13 +214,6 @@ public class TestTaskTrackerLocalization
     tracker.setTaskTrackerInstrumentation(
         TaskTracker.createInstrumentation(tracker, trackerFConf));
 
-    // setup task controller
-    taskController = createTaskController();
-    taskController.setConf(trackerFConf);
-    taskController.setup();
-    tracker.setTaskController(taskController);
-    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
-        taskController));
   }
 
   protected TaskController createTaskController() {
@@ -615,13 +616,20 @@ public class TestTaskTrackerLocalization
         + " is not created in any of the configured dirs!!",
         attemptWorkDir != null);
 
-    TaskRunner runner = task.createRunner(tracker, tip);
+    RunningJob rjob = new RunningJob(jobId);
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(trackerFConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(trackerFConf).
+      newTaskDistributedCacheManager(jobId, trackerFConf);
+
+    TaskRunner runner = task.createRunner(tracker, tip, rjob);
     tip.setTaskRunner(runner);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
-        localizedJobConf);
+        localizedJobConf, false);
     attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
         task.isTaskCleanupTask());
 
@@ -639,16 +647,6 @@ public class TestTaskTrackerLocalization
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
     // ///////
 
-    // Initialize task via TaskController
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
   }
 
   protected void checkTaskLocalization()
@@ -707,13 +705,13 @@ public class TestTaskTrackerLocalization
     out.writeBytes("dummy input");
     out.close();
     // no write permission for subDir and subDir/file
+    int ret = 0;
     try {
-      int ret = 0;
       if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
         LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
       }
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while doing chmod for " + subDir);
+    } catch (InterruptedException e) {
+      throw new IOException("chmod interrupted", e);
     }
   }
 
@@ -745,7 +743,7 @@ public class TestTaskTrackerLocalization
     InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
     tracker.setCleanupThread(cleanupQueue);
 
-    tip.removeTaskFiles(needCleanup, taskId);
+    tip.removeTaskFiles(needCleanup);
 
     if (jvmReuse) {
       // work dir should still exist and cleanup queue should be empty

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Mar  8 05:56:27 2011
@@ -485,7 +485,7 @@ public class TestTaskTrackerMemoryManage
       // tree rooted at 100 is over limit immediately, as it is
       // twice over the mem limit.
       ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree(
-                                          "100", true, 100L, 
+                                          "100", true,
                                           procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertTrue("tree rooted at 100 should be over limit " +
@@ -493,7 +493,7 @@ public class TestTaskTrackerMemoryManage
                   test.isProcessTreeOverLimit(pTree, "dummyId", limit));
       
       // the tree rooted at 200 is initially below limit.
-      pTree = new ProcfsBasedProcessTree("200", true, 100L,
+      pTree = new ProcfsBasedProcessTree("200", true,
                                           procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertFalse("tree rooted at 200 shouldn't be over limit " +
@@ -506,7 +506,7 @@ public class TestTaskTrackerMemoryManage
                   test.isProcessTreeOverLimit(pTree, "dummyId", limit));
       
       // the tree rooted at 600 is never over limit.
-      pTree = new ProcfsBasedProcessTree("600", true, 100L,
+      pTree = new ProcfsBasedProcessTree("600", true,
                                             procfsRootDir.getAbsolutePath());
       pTree.getProcessTree();
       assertFalse("tree rooted at 600 should never be over limit.",

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Mar  8 05:56:27 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
@@ -64,7 +65,7 @@ public class TestTrackerDistributedCache
     String execPath = path + "/task-controller";
     ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(new LocalDirAllocator("mapred.local.dir"));
   }
 
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java Tue Mar  8 05:56:27 2011
@@ -27,7 +27,8 @@ import org.apache.hadoop.mapred.UtilsFor
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.junit.Assert.*;
 
@@ -47,12 +48,18 @@ public class TestUserLogCleanup {
   private JobID jobid4 = new JobID(jtid, 4);
   private File foo = new File(TaskLog.getUserLogDir(), "foo");
   private File bar = new File(TaskLog.getUserLogDir(), "bar");
+  private TaskController taskController;
 
   public TestUserLogCleanup() throws IOException {
     Configuration conf = new Configuration();
-    localizer = new Localizer(FileSystem.get(conf), conf
-        .getStrings(MRConfig.LOCAL_DIR), new DefaultTaskController());
-    taskLogCleanupThread = new UserLogCleaner(conf);
+    localizer =
+      new Localizer(FileSystem.get(conf), conf.getStrings(MRConfig.LOCAL_DIR));
+    Class<? extends TaskController> taskControllerClass =
+      conf.getClass("mapred.task.tracker.task-controller",
+                     DefaultTaskController.class, TaskController.class);
+    taskController = 
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+    taskLogCleanupThread = new UserLogCleaner(conf, taskController);
     taskLogCleanupThread.setClock(myClock);
     tt = new TaskTracker();
     tt.setConf(new JobConf(conf));
@@ -66,11 +73,10 @@ public class TestUserLogCleanup {
   }
 
   private File localizeJob(JobID jobid) throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    new JobLocalizer(tt.getJobConf(), user, 
+                     jobid.toString()).initializeJobLogDir();
     File jobUserlog = TaskLog.getJobDir(jobid);
-
-    JobConf conf = new JobConf();
-    // localize job log directory
-    tt.initializeJobLogDir(jobid, conf);
     assertTrue(jobUserlog + " directory is not created.", jobUserlog.exists());
     return jobUserlog;
   }
@@ -103,16 +109,17 @@ public class TestUserLogCleanup {
 
     // add the job for deletion with one hour as retain hours
     jobFinished(jobid2, 1);
-
     // remove old logs and see jobid1 is not removed and jobid2 is removed
     myClock.advance(ONE_HOUR);
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog2);
+      
     assertTrue(jobUserlog1 + " got deleted", jobUserlog1.exists());
-    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists());
-
+    assertFalse(jobUserlog2 + " still exists.", jobUserlog2.exists()); 
     myClock.advance(ONE_HOUR);
     // remove old logs and see jobid1 is removed now
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1);
     assertFalse(jobUserlog1 + " still exists.", jobUserlog1.exists());
   }
 
@@ -151,18 +158,18 @@ public class TestUserLogCleanup {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3);
     taskLogCleanupThread.clearOldUserLogs(conf);
+    retry(foo, bar);
     assertFalse(foo.exists());
     assertFalse(bar.exists());
     assertTrue(jobUserlog1.exists());
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
     assertTrue(jobUserlog4.exists());
-    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
-        .exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 2.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1);
     assertFalse(jobUserlog1.exists());
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
@@ -175,29 +182,31 @@ public class TestUserLogCleanup {
     jobFinished(jobid3, 3);
 
     // mimic localizeJob for jobid4
-    jobUserlog4 = localizeJob(jobid4);
+    //jobUserlog4 = localizeJob(jobid4);
 
     // do cleanup
     myClock.advance(2 * ONE_HOUR);
     // time is now 4.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog4);
 
     // jobid2 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 5.
     // do cleanup again
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog3);
 
     // jobid3 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertFalse(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
   }
 
   /**
@@ -232,7 +241,7 @@ public class TestUserLogCleanup {
     // job directories will be added with 3 hours as retain hours. 
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.USER_LOG_RETAIN_HOURS, 3);
-    taskLogCleanupThread = new UserLogCleaner(conf);
+    taskLogCleanupThread = new UserLogCleaner(conf, taskController);
     myClock = new FakeClock(); // clock is reset.
     taskLogCleanupThread.setClock(myClock);
     taskLogCleanupThread.clearOldUserLogs(conf);
@@ -243,8 +252,6 @@ public class TestUserLogCleanup {
     assertTrue(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
     assertTrue(jobUserlog4.exists());
-    assertTrue(new File(TaskLog.getUserLogDir(), MRAsyncDiskService.TOBEDELETED)
-        .exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 1.
@@ -267,22 +274,42 @@ public class TestUserLogCleanup {
     myClock.advance(2 * ONE_HOUR);
     // time is now 3.
     taskLogCleanupThread.processCompletedJobs();
+    retry(jobUserlog1, jobUserlog2, jobUserlog4);
 
     // jobid1 and jobid2 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertTrue(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
 
     myClock.advance(ONE_HOUR);
     // time is now 4.
     // do cleanup again
     taskLogCleanupThread.processCompletedJobs();
-
+    retry(jobUserlog1, jobUserlog2, jobUserlog3, jobUserlog4);
+    
     // jobid3 will be deleted
     assertFalse(jobUserlog1.exists());
     assertFalse(jobUserlog2.exists());
     assertFalse(jobUserlog3.exists());
-    assertTrue(jobUserlog4.exists());
+    assertFalse(jobUserlog4.exists());
+  }
+  
+  private void retry(File... jobDirs) {
+    //since the deletion is done by a thread, we poll for sometime
+    short retries = 0;
+    while (retries++ < 20) {
+      boolean exist = false;
+      for (File dir : jobDirs) {
+        if (dir.exists()) {
+          exist = true;
+        }
+      }
+      if (exist) {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie){}
+      } else return;
+    }
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Mar  8 05:56:27 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -662,7 +663,7 @@ public class UtilsForTests {
    * asynchronously.
    */
   public static class InlineCleanupQueue extends CleanupQueue {
-    List<String> stalePaths = new ArrayList<String>();
+    List<Path> stalePaths = new ArrayList<Path>();
 
     public InlineCleanupQueue() {
       // do nothing
@@ -672,19 +673,37 @@ public class UtilsForTests {
     public void addToQueue(PathDeletionContext... contexts) {
       // delete paths in-line
       for (PathDeletionContext context : contexts) {
+        Exception exc = null;
         try {
           if (!deletePath(context)) {
             LOG.warn("Stale path " + context.fullPath);
             stalePaths.add(context.fullPath);
           }
         } catch (IOException e) {
+          exc = e;
+        } catch (InterruptedException ie) {
+          exc = ie;
+        }
+        if (exc != null) {
           LOG.warn("Caught exception while deleting path "
               + context.fullPath);
-          LOG.info(StringUtils.stringifyException(e));
+          LOG.info(StringUtils.stringifyException(exc));
           stalePaths.add(context.fullPath);
         }
       }
     }
+    static boolean deletePath(PathDeletionContext context) 
+    throws IOException, InterruptedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to delete " + context.fullPath);
+      }
+//      FileSystem fs = context.fullPath.getFileSystem(context.conf);
+//      if (fs.exists(context.fullPath)) {
+//        return fs.delete(context.fullPath, true);
+//      }
+      context.deletePath();
+      return true;
+    }
   }
   
   static class FakeClock extends Clock {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Mar  8 05:56:27 2011
@@ -32,8 +32,12 @@ import javax.security.auth.login.LoginEx
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobLocalizer;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapreduce.Cluster;
@@ -41,6 +45,9 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
+import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -54,10 +61,10 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.mortbay.log.Log;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
-
+  private static final Log LOG =
+    LogFactory.getLog(TestTrackerDistributedCacheManager.class);
   protected String TEST_ROOT_DIR =
       new File(System.getProperty("test.build.data", "/tmp"),
           TestTrackerDistributedCacheManager.class.getSimpleName())
@@ -68,10 +75,12 @@ public class TestTrackerDistributedCache
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
   private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
-  private static final int LOCAL_CACHE_SUBDIR = 2;
+  private static final int LOCAL_CACHE_SUBDIR = 1;
   protected Configuration conf;
   protected Path firstCacheFile;
+  protected Path firstCacheFilePublic;
   protected Path secondCacheFile;
+  protected Path secondCacheFilePublic;
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
@@ -109,18 +118,22 @@ public class TestTrackerDistributedCache
         taskControllerClass, conf);
 
     // setup permissions for mapred local dir
-    taskController.setup();
+    taskController.setup(localDirAllocator);
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne");
+    secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne");
+    createPublicTempFile(firstCacheFilePublic);
+    createPublicTempFile(secondCacheFilePublic);
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
   }
   
   protected void refreshConf(Configuration conf) throws IOException {
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(localDirAllocator);
   }
 
   /**
@@ -138,7 +151,8 @@ public class TestTrackerDistributedCache
    * @throws IOException
    * @throws LoginException
    */
-  public void testManagerFlow() throws IOException, LoginException {
+  public void testManagerFlow()
+      throws IOException, LoginException, InterruptedException {
     if (!canRun()) {
       return;
     }
@@ -148,6 +162,7 @@ public class TestTrackerDistributedCache
     Configuration subConf = new Configuration(conf);
     String userName = getJobOwnerName();
     subConf.set(MRJobConfig.USER_NAME, userName);
+    JobID jobid = new JobID("jt",1);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
@@ -161,15 +176,18 @@ public class TestTrackerDistributedCache
 
     // ****** Imitate TaskRunner code.
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(jobid, subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
-    // ****** End of imitating TaskRunner code
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf);
+    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
+//    // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
@@ -198,18 +216,21 @@ public class TestTrackerDistributedCache
       TrackerDistributedCacheManager {
     public FakeTrackerDistributedCacheManager(Configuration conf)
         throws IOException {
-      super(conf, taskController);
+      super(conf);
     }
 
     @Override
-    Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, boolean isArchive, boolean isPublic)
-    throws IOException {
-      if (cache.equals(firstCacheFile.toUri())) {
+    Path localizePublicCacheObject(Configuration conf, URI cache, 
+        long confFileStamp,
+        CacheStatus cacheStatus, 
+        FileStatus fileStatus, 
+        boolean isArchive) throws IOException, InterruptedException {
+      if (cache.equals(firstCacheFilePublic.toUri())) {
         throw new IOException("fake fail");
       }
-      return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          isArchive, isPublic);
+      return super.localizePublicCacheObject(conf, cache, confFileStamp, 
+          cacheStatus, fileStatus, 
+          isArchive);
     }
   }
 
@@ -220,57 +241,58 @@ public class TestTrackerDistributedCache
     }
     TrackerDistributedCacheManager manager = 
       new FakeTrackerDistributedCacheManager(conf);
-    Cluster cluster = new Cluster(conf);
+
     String userName = getJobOwnerName();
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
 
     // Configures a job with a regular file
-    Job job1 = Job.getInstance(cluster, conf);
-    job1.setUser(userName);
-    job1.addCacheFile(secondCacheFile.toUri());
+    Job job1 = new Job(conf);
     Configuration conf1 = job1.getConfiguration();
+    conf1.set("user.name", userName);
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf1);
+    
     TrackerDistributedCacheManager.determineTimestamps(conf1);
     TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
-      assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp,
-          c.owner));
+      assertEquals(0, manager.getReferenceCount(c.getStatus()));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
-    Job job2 = Job.getInstance(cluster, conf);
-    job2.setUser(userName);
+    Job job2 = new Job(conf);
+    Configuration conf2 = job2.getConfiguration();
+    conf2.set("user.name", userName);
     // add a file that would get failed to localize
-    job2.addCacheFile(firstCacheFile.toUri());
+    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
     // add a file that is already localized by different job
-    job2.addCacheFile(secondCacheFile.toUri());
+    DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
     // add a file that is never localized
-    job2.addCacheFile(thirdCacheFile.toUri());
-    Configuration conf2 = job2.getConfiguration();
+    DistributedCache.addCacheFile(thirdCacheFile.toUri(), conf2);
+    
     TrackerDistributedCacheManager.determineTimestamps(conf2);
     TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
 
     // Task localizing for second job
     // localization for the "firstCacheFile" will fail.
-    handle = manager.newTaskDistributedCacheManager(conf2);
+    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(conf2, TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+      JobLocalizer.downloadPrivateCache(conf2);
     } catch (IOException e) {
       th = e;
-      Log.info("Exception during setup", e);
+      LOG.info("Exception during setup", e);
     }
     assertNotNull(th);
     assertTrue(th.getMessage().contains("fake fail"));
@@ -278,15 +300,15 @@ public class TestTrackerDistributedCache
     th = null;
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       try {
-        assertEquals(0, manager.getReferenceCount(c.uri, conf2, c.timestamp,
-            c.owner));
-      } catch (IOException ie) {
+        int refcount = manager.getReferenceCount(c.getStatus());
+        LOG.info("checking refcount " + c.uri + " of " + refcount);
+        assertEquals(0, refcount);
+      } catch (NullPointerException ie) {
         th = ie;
-        Log.info("Exception getting reference count for " + c.uri, ie);
+        LOG.info("Exception getting reference count for " + c.uri, ie);
       }
     }
     assertNotNull(th);
-    assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
     fs.delete(thirdCacheFile, false);
   }
   
@@ -351,7 +373,7 @@ public class TestTrackerDistributedCache
   private Path checkLocalizedPath(boolean visibility) 
   throws IOException, LoginException, InterruptedException {
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     Cluster cluster = new Cluster(conf);
     String userName = getJobOwnerName();
     File workDir = new File(TEST_ROOT_DIR, "workdir");
@@ -371,10 +393,10 @@ public class TestTrackerDistributedCache
 
     // Task localizing for job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
     String distCacheDir;
     if (visibility) {
@@ -384,9 +406,8 @@ public class TestTrackerDistributedCache
     }
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
-          fs.getFileStatus(cacheFile), false,
-          c.timestamp, new Path(TEST_ROOT_DIR), false,
-          visibility);
+                            fs.getFileStatus(cacheFile), false,
+    		                c.timestamp, visibility, c);
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
@@ -494,56 +515,94 @@ public class TestTrackerDistributedCache
     Configuration conf2 = new Configuration(conf);
     conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
     conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf2, taskController);
+        new TrackerDistributedCacheManager(conf2);
     manager.startCleanupThread();
     FileSystem localfs = FileSystem.getLocal(conf2);
     String userName = getJobOwnerName();
     conf2.set(MRJobConfig.USER_NAME, userName);
 
     // We first test the size limit
-    Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+    FileStatus stat = fs.getFileStatus(firstCacheFilePublic);
+    CacheFile cfile1 = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
+    Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
-    manager.releaseCache(firstCacheFile.toUri(), conf2,
-        getFileStamp(firstCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        cfile1);
+    manager.releaseCache(cfile1.getStatus());
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+    stat = fs.getFileStatus(secondCacheFilePublic);
+    CacheFile cfile2 = new CacheFile(secondCacheFilePublic.toUri(), 
+    		                         CacheFile.FileType.REGULAR, true, 
+    		                         stat.getModificationTime(),
+    		                         true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(firstLocalCache));
+    Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(secondCacheFile), false, 
-        getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
-    checkCacheDeletion(localfs, localCache, "DistributedCache failed " +
+        fs.getFileStatus(secondCacheFilePublic), false, 
+        fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+        cfile2);
+    checkCacheDeletion(localfs, firstLocalCache, "DistributedCache failed " +
         "deleting old cache when the cache store is full.");
+    manager.stopCleanupThread();
     // Now we test the number of sub directories limit
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT * 10);
+    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
+    manager = 
+      new TrackerDistributedCacheManager(conf2);
+    manager.startCleanupThread();
     // Create the temporary cache files to be used in the tests.
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
     Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
     // Adding two more small files, so it triggers the number of sub directory
     // limit but does not trigger the file size limit.
-    createTempFile(thirdCacheFile, 1);
-    createTempFile(fourthCacheFile, 1);
+    createPrivateTempFile(thirdCacheFile);
+    createPrivateTempFile(fourthCacheFile);
+    DistributedCache.setCacheFiles(new URI[]{thirdCacheFile.toUri()}, conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    stat = fs.getFileStatus(thirdCacheFile);
+    CacheFile cfile3 = new CacheFile(thirdCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
     Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
-        getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(thirdCacheFile).getModificationTime(), 
+        false, cfile3);
+    DistributedCache.setLocalFiles(conf2, thirdLocalCache.toString());
+    JobLocalizer.downloadPrivateCache(conf2);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2,
-        getFileStamp(thirdCacheFile), 
-        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+    manager.releaseCache(cfile3.getStatus());
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
-    manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
+    stat = fs.getFileStatus(fourthCacheFile);
+    CacheFile cfile4 = new CacheFile(fourthCacheFile.toUri(), 
+            CacheFile.FileType.REGULAR, false, 
+            stat.getModificationTime(),
+            true); 
+    assertTrue("DistributedCache currently doesn't have cached file",
+        localfs.exists(thirdLocalCache));
+    DistributedCache.setCacheFiles(new URI[]{fourthCacheFile.toUri()}, conf2);
+    DistributedCache.setLocalFiles(conf2, thirdCacheFile.toUri().toString());
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
+    TrackerDistributedCacheManager.determineTimestamps(conf2);
+    Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(fourthCacheFile).getModificationTime(), false, cfile4);
     checkCacheDeletion(localfs, thirdLocalCache,
         "DistributedCache failed deleting old" +
         " cache when the cache exceeds the number of sub directories limit.");
@@ -577,17 +636,20 @@ public class TestTrackerDistributedCache
       return;
     }
     TrackerDistributedCacheManager manager =
-      new TrackerDistributedCacheManager(conf, taskController);
+      new TrackerDistributedCacheManager(conf);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
     String userName = getJobOwnerName();
     conf.set(MRJobConfig.USER_NAME, userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
+    CacheFile file = new CacheFile(fileToCache.toUri(), 
+    		                       CacheFile.FileType.REGULAR, 
+    		                       false, 0, false);
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile),
-        new Path(TEST_ROOT_DIR), false, false);
+        System.currentTimeMillis(),
+        false, file);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -622,6 +684,8 @@ public class TestTrackerDistributedCache
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
+    new File(firstCacheFilePublic.toString()).delete();
+    new File(secondCacheFilePublic.toString()).delete();
     FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }
 
@@ -642,9 +706,13 @@ public class TestTrackerDistributedCache
     }
     
     public FileStatus getFileStatus(Path p) throws IOException {
+      FileStatus rawFileStatus = super.getFileStatus(p);
       File f = pathToFile(p);
-      return new FileStatus(f.length(), f.isDirectory(), 1, 128,
-      f.lastModified() + increment, makeQualified(new Path(f.getPath())));
+      FileStatus status = new FileStatus(f.length(), f.isDirectory(), 1, 128,
+             f.lastModified() + increment, 0,
+             rawFileStatus.getPermission(), rawFileStatus.getOwner(), 
+             rawFileStatus.getGroup(), makeQualified(new Path(f.getPath())));
+      return status;
     }
     
     void advanceClock(long millis) {
@@ -662,7 +730,7 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
 
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(myConf, taskController);
+      new TrackerDistributedCacheManager(myConf);
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(myConf);
@@ -674,14 +742,16 @@ public class TestTrackerDistributedCache
 
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
+    handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    //TODO this doesn't really happen in the TaskRunner
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
     // ****** End of imitating TaskRunner code
-
+    JobLocalizer.downloadPrivateCache(subConf);
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
     assertEquals(1, localCacheFiles.length);
@@ -699,8 +769,10 @@ public class TestTrackerDistributedCache
     // running a task of the same job
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+//      handle.setupPrivateCache(localDirAllocator, TaskTracker
+//          .getPrivateDistributedCacheDir(userName));
     } catch (IOException ie) {
       th = ie;
     }
@@ -713,21 +785,22 @@ public class TestTrackerDistributedCache
     // running a task of the same job on another TaskTracker which has never
     // initialized the cache
     TrackerDistributedCacheManager manager2 = 
-      new TrackerDistributedCacheManager(myConf, taskController);
+      new TrackerDistributedCacheManager(myConf);
     TaskDistributedCacheManager handle2 =
-      manager2.newTaskDistributedCacheManager(subConf);
+      manager2.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
     File workDir2 = new File(new Path(TEST_ROOT_DIR, "workdir2").toString());
     th = null;
     try {
-      handle2.setup(localDirAllocator, workDir2, TaskTracker
+      handle2.setupCache(subConf, TaskTracker
           .getPrivateDistributedCacheDir(userName), 
           TaskTracker.getPublicDistributedCacheDir());
+      JobLocalizer.downloadPrivateCache(subConf);
     } catch (IOException ie) {
       th = ie;
     }
     assertNotNull("Throwable is null", th);
     assertTrue("Exception message does not match",
-        th.getMessage().contains("has changed on HDFS since job started"));
+        th.getMessage().contains("changed during the job from"));
     // release
     handle.release();
     
@@ -739,9 +812,10 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
     
     handle =
-      manager.newTaskDistributedCacheManager(subConf2);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
+    handle.setupCache(subConf2, TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf2);
     Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
     assertNotNull(null, localCacheFiles2);
     assertEquals(1, localCacheFiles2.length);
@@ -771,26 +845,36 @@ public class TestTrackerDistributedCache
     String userName = getJobOwnerName();
     conf.set(MRJobConfig.USER_NAME, userName);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf, taskController);
+        new TrackerDistributedCacheManager(conf);
     FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
 
     Path[] localCache = new Path[2];
-    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+    FileStatus stat = fs.getFileStatus(firstCacheFile);
+    CacheFile file = new CacheFile(firstCacheFilePublic.toUri(), 
+    		                       CacheFile.FileType.REGULAR, true, 
+    		                       stat.getModificationTime(), false);
+    localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true,
+        file);
     FsPermission myPermission = new FsPermission((short)0600);
     Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
     if (FileSystem.create(localfs, myFile, myPermission) == null) {
       throw new IOException("Could not create " + myFile);
     }
     try {
-      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+      stat = fs.getFileStatus(secondCacheFilePublic);
+      file = new CacheFile(secondCacheFilePublic.toUri(),
+    		               CacheFile.FileType.REGULAR,
+    		               true, stat.getModificationTime(), false);
+      localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf, 
           TaskTracker.getPrivateDistributedCacheDir(userName),
-          fs.getFileStatus(secondCacheFile), false, 
-          getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false,
-          false);
-      FileStatus stat = localfs.getFileStatus(myFile);
+          fs.getFileStatus(secondCacheFilePublic), false, 
+          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true,
+          file);
+      stat = localfs.getFileStatus(myFile);
       assertTrue(stat.getPermission().equals(myPermission));
       // validate permissions of localized files.
       checkFilePermissions(localCache);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestBinaryTokenFile.java Tue Mar  8 05:56:27 2011
@@ -177,7 +177,7 @@ public class TestBinaryTokenFile {
     jConf = mrCluster.createJobConf();
     
     // provide namenodes names for the job to get the delegation tokens for
-    String nnUri = dfsCluster.getURI(0).toString();
+    String nnUri = dfsCluster.getURI().toString();
     jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
     // job tracker principla id..
     jConf.set(JTConfig.JT_USER_NAME, "jt_id");

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/security/TestTokenCache.java Tue Mar  8 05:56:27 2011
@@ -224,7 +224,7 @@ public class TestTokenCache {
     jConf = mrCluster.createJobConf();
     
     // provide namenodes names for the job to get the delegation tokens for
-    String nnUri = dfsCluster.getURI(0).toString();
+    String nnUri = dfsCluster.getURI().toString();
     jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);
     // job tracker principla id..
     jConf.set(JTConfig.JT_USER_NAME, "jt_id/foo@BAR");

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java Tue Mar  8 05:56:27 2011
@@ -35,9 +35,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 import junit.framework.TestCase;
 
@@ -60,7 +63,7 @@ public class TestProcfsBasedProcessTree 
     public void run() {
       try {
         Vector<String> args = new Vector<String>();
-        if(ProcessTree.isSetsidAvailable) {
+        if(TaskController.isSetsidAvailable) {
           args.add("setsid");
         }
         args.add("bash");
@@ -95,7 +98,7 @@ public class TestProcfsBasedProcessTree 
     return getPidFromPidFile(pidFile);
   }
 
-  public void testProcessTree() {
+  public void testProcessTree() throws Exception {
 
     try {
       if (!ProcfsBasedProcessTree.isAvailable()) {
@@ -149,8 +152,7 @@ public class TestProcfsBasedProcessTree 
     String pid = getRogueTaskPID();
     LOG.info("Root process pid: " + pid);
     ProcfsBasedProcessTree p = new ProcfsBasedProcessTree(pid,
-                               ProcessTree.isSetsidAvailable,
-                               ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+                               TaskController.isSetsidAvailable);
     p = p.getProcessTree(); // initialize
     LOG.info("ProcessTree: " + p.toString());
 
@@ -171,13 +173,14 @@ public class TestProcfsBasedProcessTree 
     String processTreeDump = p.getProcessTreeDump();
 
     // destroy the process and all its subprocesses
-    p.destroy(true/*in the background*/);
+    TaskController tc = new DefaultTaskController();
+    tc.signalTask(null, Integer.valueOf(pid), Signal.KILL);
 
-    if(ProcessTree.isSetsidAvailable) {// whole processtree should be gone
-      assertEquals(false, p.isAnyProcessInTreeAlive());
-    }
-    else {// process should be gone
-      assertFalse("ProcessTree must have been gone", p.isAlive());
+    if (TaskController.isSetsidAvailable) { // whole processtree should be gone
+      assertFalse("Proceesses in process group live",
+          p.isAnyProcessInTreeAlive(tc));
+    } else {// process should be gone
+      assertFalse("ProcessTree must have been gone", p.isAlive(tc));
     }
 
     LOG.info("Process-tree dump follows: \n" + processTreeDump);
@@ -204,7 +207,7 @@ public class TestProcfsBasedProcessTree 
 
     // ProcessTree is gone now. Any further calls should be sane.
     p = p.getProcessTree();
-    assertFalse("ProcessTree must have been gone", p.isAlive());
+    assertFalse("ProcessTree must have been gone", p.isAlive(tc));
     assertTrue("Cumulative vmem for the gone-process is "
         + p.getCumulativeVmem() + " . It should be zero.", p
         .getCumulativeVmem() == 0);
@@ -333,8 +336,8 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = 
-          new ProcfsBasedProcessTree("100", true, 100L, 
-                                  procfsRootDir.getAbsolutePath());
+          new ProcfsBasedProcessTree("100", true,
+              procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
       
@@ -406,8 +409,8 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = 
-          new ProcfsBasedProcessTree("100", true, 100L, 
-                                  procfsRootDir.getAbsolutePath());
+          new ProcfsBasedProcessTree("100", true,
+              procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
       
@@ -498,11 +501,11 @@ public class TestProcfsBasedProcessTree 
       
       // crank up the process tree class.
       ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
-                        pid, true, 100L, procfsRootDir.getAbsolutePath());
+                        pid, true, procfsRootDir.getAbsolutePath());
 
       // Let us not create stat file for pid 100.
       assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
-                            pid, procfsRootDir.getAbsolutePath()));
+            Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -551,9 +554,8 @@ public class TestProcfsBasedProcessTree 
       writeStatFiles(procfsRootDir, pids, procInfos);
       writeCmdLineFiles(procfsRootDir, pids, cmdLines);
 
-      ProcfsBasedProcessTree processTree =
-          new ProcfsBasedProcessTree("100", true, 100L, procfsRootDir
-              .getAbsolutePath());
+      ProcfsBasedProcessTree processTree = new ProcfsBasedProcessTree(
+          "100", true, procfsRootDir.getAbsolutePath());
       // build the process tree.
       processTree.getProcessTree();
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/security/TestMapredGroupMappingServiceRefresh.java Tue Mar  8 05:56:27 2011
@@ -100,7 +100,7 @@ public class TestMapredGroupMappingServi
     cluster = new MiniDFSCluster(0, config, 1, true, true, true,  null, null, 
         null, null);
     cluster.waitActive();
-    URI uri = cluster.getURI(0);
+    URI uri = cluster.getURI();
     
     MiniMRCluster miniMRCluster = new MiniMRCluster(0, uri.toString() , 
       3, null, null, config);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/testshell/ExternalMapReduce.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/testshell/ExternalMapReduce.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/testshell/ExternalMapReduce.java Tue Mar  8 05:56:27 2011
@@ -18,6 +18,7 @@
 
 package testshell;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -91,6 +92,10 @@ public class ExternalMapReduce extends C
       if (ret != 0) {
         throw new IOException("files_tmp does not exist");
       }
+      File file = new File("./ziplink/test.txt");
+      if (!file.canExecute()) {
+        throw new IOException("ziplink/test.txt is not executable");
+      }
     }
   }
 



Mime
View raw message