hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346214 [5/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea...
Date Tue, 05 Jun 2012 02:33:47 GMT
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun  5 02:33:44 2012
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.IntWritable;
@@ -72,50 +73,47 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
-import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -164,14 +162,15 @@ public class TaskTracker 
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
-  // each job at job localization time. This will be used by TaskLogServlet for
-  // authorizing viewing of task logs of that job
+  //Job ACLs file is created by TaskController under userlogs/$jobid directory
+  //for each job at job localization time. This will be used by TaskLogServlet 
+  //for authorizing viewing of task logs of that job
   static String jobACLsFile = "job-acls.xml";
 
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
+  private String[] localdirs;
   String taskTrackerName;
   String localHostname;
   InetSocketAddress jobTrackAddr;
@@ -241,11 +240,12 @@ public class TaskTracker 
   static final String DISTCACHEDIR = "distcache";
   static final String JOBCACHE = "jobcache";
   static final String OUTPUT = "output";
-  private static final String JARSDIR = "jars";
+  static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
   static final String LOCAL_SPLIT_META_FILE = "split.info";
   static final String JOBFILE = "job.xml";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
+  static final String TT_PRIVATE_DIR = "ttprivate";
 
   static final String JOB_LOCAL_DIR = MRJobConfig.JOB_LOCAL_DIR;
 
@@ -445,7 +445,6 @@ public class TaskTracker 
       RunningJob rJob = null;
       if (!runningJobs.containsKey(jobId)) {
         rJob = new RunningJob(jobId);
-        rJob.localized = false;
         rJob.tasks = new HashSet<TaskInProgress>();
         runningJobs.put(jobId, rJob);
       } else {
@@ -454,7 +453,6 @@ public class TaskTracker 
       synchronized (rJob) {
         rJob.tasks.add(tip);
       }
-      runningJobs.notify(); //notify the fetcher thread
       return rJob;
     }
   }
@@ -512,22 +510,32 @@ public class TaskTracker 
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
   
+  static String getPrivateDirJobConfFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+  }
+  
   static String getLocalJobTokenFile(String user, String jobid) {
-    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR +
+      TaskTracker.JOB_TOKEN_FILE;
   }
 
-
   static String getTaskConfFile(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
     return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
         + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+     String taskid) {
+   return TT_PRIVATE_DIR + Path.SEPARATOR + 
+          getLocalTaskDir(user, jobid, taskid);
+ }
+
   static String getJobJarsDir(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
   }
 
-  static String getJobJarFile(String user, String jobid) {
+  public static String getJobJarFile(String user, String jobid) {
     return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
   }
 
@@ -551,7 +559,8 @@ public class TaskTracker 
         + TaskTracker.OUTPUT;
   }
 
-  static String getLocalTaskDir(String user, String jobid, String taskid) {
+  public static String getLocalTaskDir(String user, String jobid, 
+      String taskid) {
     return getLocalTaskDir(user, jobid, taskid, false);
   }
 
@@ -569,6 +578,19 @@ public class TaskTracker 
     String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
     return dir + Path.SEPARATOR + MRConstants.WORKDIR;
   }
+  
+  static String getPrivateDirJobTokenFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalJobTokenFile(user, jobid); 
+  }
+  
+  static String getPrivateDirForJob(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+  }
+  
+  String[] getLocalDirs() {
+    return localdirs;
+  }
 
   String getPid(TaskAttemptID tid) {
     TaskInProgress tip = tasks.get(tid);
@@ -587,7 +609,47 @@ public class TaskTracker 
                             protocol);
     }
   }
-    
+
+  /**
+   * Delete all of the user directories.
+   * @param conf the TT configuration
+   * @throws IOException
+   */
+  private void deleteUserDirectories(Configuration conf) throws IOException {
+    for(String root: localdirs) {
+      for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+        String owner = status.getOwner();
+        String path = status.getPath().getName();
+        if (path.equals(owner)) {
+          taskController.deleteAsUser(owner, "");
+        }
+      }
+    }
+  }
+  
+  public void cleanupAllVolumes() throws IOException {
+    for (int v = 0; v < localdirs.length; v++) {
+      // List all files inside the volumes
+      FileStatus[] files = localFs.listStatus(new Path(localdirs[v]));
+      for (int f = 0; f < files.length; f++) {
+        if (files[f].getPath().getName().equals(SUBDIR)) {
+          FileStatus[] userDirs = 
+            localFs.listStatus(new Path(localdirs[v] +Path.SEPARATOR+ SUBDIR));
+          for (int k = 0; k < userDirs.length; k++) {
+            // Get the relative file name to the root of the volume
+            String absoluteFilename = files[f].getPath().toUri().getPath();
+            getAsyncDiskService().moveAndDeleteRelativePath(localdirs[v], 
+                                                            absoluteFilename);
+          }
+          //          // Do not delete the current TOBEDELETED
+          //          if (!TOBEDELETED.equals(relative)) {
+          //            moveAndDeleteRelativePath(volumes[v], relative);
+          //          }
+        }
+      }
+      getAsyncDiskService().moveAndDeleteFromEachVolume(TT_PRIVATE_DIR);
+    }
+  }
   
   int getHttpPort() {
     return httpPort;
@@ -617,9 +679,18 @@ public class TaskTracker 
  
     // Check local disk, start async disk service, and clean up all 
     // local directories.
-    checkLocalDirs(this.fConf.getLocalDirs());
-    setAsyncDiskService(new MRAsyncDiskService(fConf));
-    getAsyncDiskService().cleanupAllVolumes();
+    checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+    setAsyncDiskService(new MRAsyncDiskService(localFs, 
+                            taskController, fConf.getLocalDirs()));
+    cleanupAllVolumes();
+    final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+    }
+    final FsPermission priv = FsPermission.createImmutable((short) 0700);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+    }
 
     // Clear out state tables
     this.tasks.clear();
@@ -677,15 +748,6 @@ public class TaskTracker 
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    Class<? extends TaskController> taskControllerClass = fConf.getClass(
-        TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
-    taskController = (TaskController) ReflectionUtils.newInstance(
-        taskControllerClass, fConf);
-
-
-    // setup and create jobcache directory with appropriate permissions
-    taskController.setup();
-
     // Initialize DistributedCache
     this.distributedCacheManager = 
         new TrackerDistributedCacheManager(this.fConf, taskController,
@@ -730,7 +792,7 @@ public class TaskTracker 
     reduceLauncher.start();
 
     // create a localizer instance
-    setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+    setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
 
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
@@ -814,6 +876,9 @@ public class TaskTracker 
       List <FetchStatus> fList = new ArrayList<FetchStatus>();
       for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
         RunningJob rjob = item.getValue();
+        if (!rjob.localized) {
+          continue;
+        }
         JobID jobId = item.getKey();
         FetchStatus f;
         synchronized (rjob) {
@@ -987,33 +1052,29 @@ public class TaskTracker 
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
-
-    // Initialize the user directories if needed.
-    getLocalizer().initializeUserDirs(t.getUser());
+    InetSocketAddress ttAddr = getTaskTrackerReportAddress();
 
     synchronized (rjob) {
       if (!rjob.localized) {
-       
-        JobConf localJobConf = localizeJobFiles(t, rjob);
-        // initialize job log directory
-        initializeJobLogDir(jobId, localJobConf);
-
-        // Now initialize the job via task-controller so as to set
-        // ownership/permissions of jars, job-work-dir. Note that initializeJob
-        // should be the last call after every other directory/file to be
-        // directly under the job directory is created.
-        JobInitializationContext context = new JobInitializationContext();
-        context.jobid = jobId;
-        context.user = t.getUser();
-        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
-        taskController.initializeJob(context);
-
+        Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+        JobConf localJobConf = new JobConf(localJobConfPath);
+        // to be doubly sure, overwrite the user in the config with the one the
+        // TT thinks it is
+        localJobConf.setUser(t.getUser());
+        //also reset the #tasks per jvm
+        resetNumTasksPerJvm(localJobConf);
+        //set the base jobconf path in rjob; all tasks will use
+        //this as the base path when they run
+        rjob.localizedJobConf = localJobConfPath;
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
         rjob.localized = true;
       }
     }
+    synchronized (runningJobs) {
+      runningJobs.notify(); //notify the fetcher thread
+    }
     return rjob;
   }
 
@@ -1032,31 +1093,32 @@ public class TaskTracker 
    * Localize the job on this tasktracker. Specifically
    * <ul>
    * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the credentials file</li>
    * <li>Download the job config file job.xml from the FS</li>
-   * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
-   * in the configuration.
-   * <li>Download the job jar file job.jar from the FS, unjar it and set jar
-   * file in the configuration.</li>
+   * <li>Invokes the {@link TaskController} to do the rest of the job 
+   * initialization</li>
    * </ul>
-   * 
+   *
    * @param t task whose job has to be localized on this TT
-   * @return the modified job configuration to be used for all the tasks of this
-   *         job as a starting point.
+   * @param rjob the {@link RunningJob}
+   * @param ttAddr the tasktracker's RPC address
+   * @return the path to the job configuration to be used for all the tasks
+   *         of this job as a starting point.
    * @throws IOException
    */
-  JobConf localizeJobFiles(Task t, RunningJob rjob)
-      throws IOException, InterruptedException {
-    JobID jobId = t.getJobID();
-    String userName = t.getUser();
-
-    // Initialize the job directories
-    FileSystem localFs = FileSystem.getLocal(fConf);
-    getLocalizer().initializeJobDirs(userName, jobId);
+  Path initializeJob(final Task t, final RunningJob rjob, 
+      final InetSocketAddress ttAddr)
+  throws IOException, InterruptedException {
+    final JobID jobId = t.getJobID();
+
+    final Path jobFile = new Path(t.getJobFile());
+    final String userName = t.getUser();
+    final Configuration conf = getJobConf();
     // save local copy of JobToken file
-    String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
-    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -1064,38 +1126,87 @@ public class TaskTracker 
     for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
       rjob.ugi.addToken(token);
     }
+    FileSystem userFs = getFS(jobFile, jobId, conf);
     // Download the job.xml for this job from the system FS
-    Path localJobFile =
-        localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
+    final Path localJobFile =
+      localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
 
-    JobConf localJobConf = new JobConf(localJobFile);
-    //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
-    //AS PART OF THE TASK OBJECT
-    localJobConf.setUser(userName);
-    
-    // set the location of the token file into jobConf to transfer 
-    // the name to TaskRunner
-    localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
-        localJobTokenFile);
-    
-
-    // create the 'job-work' directory: job-specific shared directory for use as
-    // scratch space by all tasks of the same job running on this TaskTracker. 
-    Path workDir =
-        lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
-            .toString()), fConf);
-    if (!localFs.mkdirs(workDir)) {
-      throw new IOException("Mkdirs failed to create " 
-                  + workDir.toString());
-    }
-    System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
-    localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
-    // Download the job.jar for this job from the system FS
-    localizeJobJarFile(userName, jobId, localFs, localJobConf);
-    
-    return localJobConf;
+    /**
+      * Now initialize the job via task-controller to do the rest of the
+      * job-init. Do this within a doAs since the public distributed cache 
+      * is also set up here.
+      * To support potential authenticated HDFS accesses, we need the tokens
+      */
+    rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws IOException, InterruptedException {
+        try {
+          final JobConf localJobConf = new JobConf(localJobFile);
+          // Setup the public distributed cache
+          TaskDistributedCacheManager taskDistributedCacheManager =
+            getTrackerDistributedCacheManager()
+           .newTaskDistributedCacheManager(jobId, localJobConf);
+          rjob.distCacheMgr = taskDistributedCacheManager;
+          taskDistributedCacheManager.setupCache(localJobConf,
+            TaskTracker.getPublicDistributedCacheDir(),
+            TaskTracker.getPrivateDistributedCacheDir(userName));
+
+          // Set some config values
+          localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+              getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+          if (conf.get("slave.host.name") != null) {
+            localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+          }
+          resetNumTasksPerJvm(localJobConf);
+          localJobConf.setUser(t.getUser());
+
+          // write back the config (this config will have the updates that the
+          // distributed cache manager makes as well)
+          JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+          taskController.initializeJob(t.getUser(), jobId.toString(), 
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+              ttAddr);
+        } catch (IOException e) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(e));
+          throw e;
+        } catch (InterruptedException ie) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(ie));
+          throw ie;
+        }
+        return null;
+      }
+    });
+    //search for the conf that the initializeJob created
+    //need to look up certain configs from this conf, like
+    //the distributed cache, profiling, etc. ones
+    Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+           userName, jobId.toString()), getJobConf());
+    return initializedConf;
   }
 
+  /** If certain configs are enabled, then jvm-reuse should be disabled
+   * @param localJobConf
+   */
+  static void resetNumTasksPerJvm(JobConf localJobConf) {
+    boolean debugEnabled = false;
+    if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+      return;
+    }
+    if (localJobConf.getMapDebugScript() != null ||
+        localJobConf.getReduceDebugScript() != null) {
+      debugEnabled = true;
+    }
+    String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+    if (debugEnabled || localJobConf.getProfileEnabled() ||
+        keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+      //disable jvm reuse
+      localJobConf.setNumTasksToExecutePerJvm(1);
+    }
+  }
+
+
   // Create job userlog dir.
   // Create job acls file in job log dir, if needed.
   void initializeJobLogDir(JobID jobId, JobConf localJobConf)
@@ -1163,15 +1274,15 @@ public class TaskTracker 
   /**
    * Download the job configuration file from the FS.
    * 
-   * @param t Task whose job file has to be downloaded
-   * @param jobId jobid of the task
+   * @param jobFile the original location of the configuration file
+   * @param user the user in question
+   * @param userFs the FileSystem created on behalf of the user
+   * @param jobId jobid in question
    * @return the local file system path of the downloaded file.
    * @throws IOException
    */
-  private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
-      throws IOException, InterruptedException {
-    final JobConf conf = new JobConf(getJobConf());
-    FileSystem userFs = getFS(jobFile, jobId, conf);
+  private Path localizeJobConfFile(Path jobFile, String user, 
+      FileSystem userFs, JobID jobId) throws IOException {
     // Get sizes of JobFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
@@ -1183,69 +1294,25 @@ public class TaskTracker 
       jobFileSize = -1;
     }
 
-    Path localJobFile =
-        lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
-            jobFileSize, fConf);
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+        getPrivateDirJobConfFile(user, jobId.toString()), jobFileSize, fConf);
 
     // Download job.xml
     userFs.copyToLocalFile(jobFile, localJobFile);
     return localJobFile;
   }
 
-  /**
-   * Download the job jar file from FS to the local file system and unjar it.
-   * Set the local jar file in the passed configuration.
-   * 
-   * @param jobId
-   * @param localFs
-   * @param localJobConf
-   * @throws IOException
-   */
-  private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
-      JobConf localJobConf)
-      throws IOException, InterruptedException {
-    // copy Jar file to the local FS and unjar it.
-    String jarFile = localJobConf.getJar();
-    FileStatus status = null;
-    long jarFileSize = -1;
-    if (jarFile != null) {
-      Path jarFilePath = new Path(jarFile);
-      FileSystem fs = getFS(jarFilePath, jobId, localJobConf);
-      try {
-        status = fs.getFileStatus(jarFilePath);
-        jarFileSize = status.getLen();
-      } catch (FileNotFoundException fe) {
-        jarFileSize = -1;
-      }
-      // Here we check for five times the size of jarFileSize to accommodate for
-      // unjarring the jar file in the jars directory
-      Path localJarFile =
-          lDirAlloc.getLocalPathForWrite(
-              getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
-      // Download job.jar
-      fs.copyToLocalFile(jarFilePath, localJarFile);
-
-      localJobConf.setJar(localJarFile.toString());
-
-      // Un-jar the parts of the job.jar that need to be added to the classpath
-      RunJar.unJar(
-        new File(localJarFile.toString()),
-        new File(localJarFile.getParent().toString()),
-        localJobConf.getJarUnpackPattern());
-    }
-  }
-
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
-      UserGroupInformation ugi) throws IOException {
+        RunningJob rjob) throws IOException {
     synchronized (tip) {
       tip.setJobConf(jobConf);
-      tip.setUGI(ugi);
-      tip.launchTask();
+      tip.setUGI(rjob.ugi);
+      tip.launchTask(rjob);
     }
   }
     
-  public synchronized void shutdown() throws IOException {
+  public synchronized void shutdown()
+      throws IOException, InterruptedException  {
     shuttingDown = true;
     close();
     if (this.server != null) {
@@ -1262,8 +1329,9 @@ public class TaskTracker 
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws InterruptedException 
    */
-  public synchronized void close() throws IOException {
+  public synchronized void close() throws IOException, InterruptedException {
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1371,8 +1439,14 @@ public class TaskTracker 
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
+    Class<? extends TaskController> taskControllerClass =
+       conf.getClass("mapred.task.tracker.task-controller",
+                      DefaultTaskController.class, TaskController.class);
+    taskController =
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+    taskController.setup(localDirAllocator);
     // create task log cleanup thread
-    setTaskLogCleanupThread(new UserLogCleaner(fConf));
+    setTaskLogCleanupThread(new UserLogCleaner(fConf, taskController));
 
     UserGroupInformation.setConfiguration(fConf);
     SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
@@ -1392,7 +1466,7 @@ public class TaskTracker 
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue();
+    directoryCleanupThread = CleanupQueue.getInstance();
     // start tasklog cleanup thread
     taskLogCleanupThread.setDaemon(true);
     taskLogCleanupThread.start();
@@ -1873,7 +1947,6 @@ public class TaskTracker 
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           myInstrumentation.timedoutTask(tip.getTask().getTaskID());
-          dumpTaskStack(tip);
           purgeTask(tip, true);
         }
       }
@@ -1881,86 +1954,6 @@ public class TaskTracker 
   }
 
   /**
-   * Builds list of PathDeletionContext objects for the given paths
-   */
-  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
-      Path[] paths) {
-    int i = 0;
-    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
-    }
-    return contexts;
-  }
-
-  /**
-   * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a 
-   * job each pointing to the job's jobLocalDir.
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param id    : {@link JobID} of the job for which the local-dir needs to 
-   *                be cleaned up.
-   * @param user  : Job owner's username
-   * @param taskController : the task-controller to be used for deletion of
-   *                         jobLocalDir
-   */
-  static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
-      FileSystem fs, Path[] paths, JobID id, String user,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
-                                                               taskController);
-    }
-    return contexts;
-  } 
-  
-  /**
-   * Builds list of TaskControllerTaskPathDeletionContext objects for a task
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
-   * @param isWorkDir : the dir to be deleted is workDir or taskDir
-   * @param taskController : the task-controller to be used for deletion of
-   *                         taskDir or taskWorkDir
-   */
-  static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
-      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
-                          isWorkDir, taskController);
-    }
-    return contexts;
-  }
-
-  /**
-   * Send a signal to a stuck task commanding it to dump stack traces
-   * to stderr before we kill it with purgeTask().
-   *
-   * @param tip {@link TaskInProgress} to dump stack traces.
-   */
-  private void dumpTaskStack(TaskInProgress tip) {
-    TaskRunner runner = tip.getTaskRunner();
-    if (null == runner) {
-      return; // tip is already abandoned.
-    }
-
-    JvmManager jvmMgr = runner.getJvmManager();
-    jvmMgr.dumpStack(runner);
-  }
-
-  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException
@@ -1976,7 +1969,12 @@ public class TaskTracker 
     if (rjob == null) {
       LOG.warn("Unknown job " + jobId + " being deleted.");
     } else {
-      synchronized (rjob) {            
+      synchronized (rjob) {
+        // decrement the reference counts for the items this job references
+        rjob.distCacheMgr.release();
+        // inform the cache manager that the job is done
+        getTrackerDistributedCacheManager()
+            .deleteTaskDistributedCacheManager(jobId);
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
           tip.jobHasFinished(false);
@@ -1988,7 +1986,7 @@ public class TaskTracker 
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles) {
-          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
+          removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
         }
         // add job to taskLogCleanupThread
         long now = System.currentTimeMillis();
@@ -2016,15 +2014,25 @@ public class TaskTracker 
   /**
    * This job's files are no longer needed on this TT, remove them.
    * 
-   * @param rjob
+   * @param user User who ran the job
+   * @param jobId Remove local work dirs for this job
    * @throws IOException
    */
-  void removeJobFiles(String user, JobID jobId)
-      throws IOException {
-    PathDeletionContext[] contexts = 
-      buildTaskControllerJobPathDeletionContexts(localFs, 
-          getLocalFiles(fConf, ""), jobId, user, taskController);
-    directoryCleanupThread.addToQueue(contexts);
+  void removeJobFiles(String user, JobID jobId) throws IOException {
+    String jobDir = getLocalJobDir(user, jobId.toString());
+    PathDeletionContext jobCleanup = 
+      new TaskController.DeletionContext(getTaskController(), false, user, 
+                                         jobDir,
+                                         localdirs);
+    directoryCleanupThread.addToQueue(jobCleanup);
+    
+    for (String str : localdirs) {
+      Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+        new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+      PathDeletionContext ttPrivateJobCleanup =
+        new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+      directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+    }
   }
 
   /**
@@ -2329,12 +2337,14 @@ public class TaskTracker 
    * Start a new task.
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
+   * @throws InterruptedException 
    */
-  void startNewTask(TaskInProgress tip) {
+  void startNewTask(TaskInProgress tip) throws InterruptedException {
     try {
       RunningJob rjob = localizeJob(tip);
+      tip.getTask().setJobFile(rjob.localizedJobConf.toString());
       // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
-      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); 
+      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2344,8 +2354,9 @@ public class TaskTracker 
         tip.kill(true);
         tip.cleanup(true);
       } catch (IOException ie2) {
-        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
-                 StringUtils.stringifyException(ie2));          
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+      } catch (InterruptedException ie2) {
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
       }
         
       // Careful! 
@@ -2466,7 +2477,6 @@ public class TaskTracker 
     private TaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
-    private JobConf defaultJobConf;
     private JobConf localJobConf;
     private boolean keepFailedTaskFiles;
     private boolean alwaysKeepTaskFiles;
@@ -2498,7 +2508,6 @@ public class TaskTracker 
       this.task = task;
       this.launcher = launcher;
       this.lastProgressReport = System.currentTimeMillis();
-      this.defaultJobConf = conf;
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
@@ -2516,66 +2525,9 @@ public class TaskTracker 
     }
         
     void localizeTask(Task task) throws IOException{
-
-      FileSystem localFs = FileSystem.getLocal(fConf);
-
-      // create taskDirs on all the disks.
-      getLocalizer().initializeAttemptDirs(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-
-      // create the working-directory of the task 
-      Path cwd =
-          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
-              .getJobID().toString(), task.getTaskID().toString(), task
-              .isTaskCleanupTask()), defaultJobConf);
-      if (!localFs.mkdirs(cwd)) {
-        throw new IOException("Mkdirs failed to create " 
-                    + cwd.toString());
-      }
-
-      localJobConf.set(LOCAL_DIR,
-                       fConf.get(LOCAL_DIR));
-
-      if (fConf.get(TT_HOST_NAME) != null) {
-        localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
-      }
-            
-      keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
       // Do the task-type specific localization
+      //TODO: are these calls really required
       task.localizeConfiguration(localJobConf);
-      
-      List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
-      if (staticResolutions != null && staticResolutions.size() > 0) {
-        StringBuffer str = new StringBuffer();
-
-        for (int i = 0; i < staticResolutions.size(); i++) {
-          String[] hostToResolved = staticResolutions.get(i);
-          str.append(hostToResolved[0]+"="+hostToResolved[1]);
-          if (i != staticResolutions.size() - 1) {
-            str.append(',');
-          }
-        }
-        localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
-      }
-      if (task.isMapTask()) {
-        debugCommand = localJobConf.getMapDebugScript();
-      } else {
-        debugCommand = localJobConf.getReduceDebugScript();
-      }
-      String keepPattern = localJobConf.getKeepTaskFilesPattern();
-      if (keepPattern != null) {
-        alwaysKeepTaskFiles = 
-          Pattern.matches(keepPattern, task.getTaskID().toString());
-      } else {
-        alwaysKeepTaskFiles = false;
-      }
-      if (debugCommand != null || localJobConf.getProfileEnabled() ||
-          alwaysKeepTaskFiles || keepFailedTaskFiles) {
-        //disable jvm reuse
-        localJobConf.setNumTasksToExecutePerJvm(1);
-      }
       task.setConf(localJobConf);
     }
         
@@ -2598,6 +2550,18 @@ public class TaskTracker 
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
       taskTimeout = localJobConf.getLong(MRJobConfig.TASK_TIMEOUT, 
                                          10 * 60 * 1000);
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
+      }
+      String keepPattern = localJobConf.getKeepTaskFilesPattern();
+      if (keepPattern != null) {
+        alwaysKeepTaskFiles = 
+          Pattern.matches(keepPattern, task.getTaskID().toString());
+      } else {
+        alwaysKeepTaskFiles = false;
+      }
     }
         
     public synchronized JobConf getJobConf() {
@@ -2618,7 +2582,7 @@ public class TaskTracker 
     /**
      * Kick off the task execution
      */
-    public synchronized void launchTask() throws IOException {
+    public synchronized void launchTask(RunningJob rjob) throws IOException {
       if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
           this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
           this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2626,7 +2590,7 @@ public class TaskTracker 
         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
         }
-        setTaskRunner(task.createRunner(TaskTracker.this, this));
+        setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
@@ -2886,11 +2850,6 @@ public class TaskTracker 
       } catch(Exception e){
         LOG.warn("Exception finding task's stdout/err/syslog files", e);
       }
-      File workDir = new File(lDirAlloc.getLocalPathToRead(
-          TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID()
-              .toString(), task.getTaskID().toString(), task
-              .isTaskCleanupTask())
-              + Path.SEPARATOR + MRConstants.WORKDIR, localJobConf).toString());
       // Build the command  
       File stdout = TaskLog.getTaskLogFile(task.getTaskID(), task
           .isTaskCleanupTask(), TaskLog.LogName.DEBUGOUT);
@@ -2914,13 +2873,14 @@ public class TaskTracker 
       vargs.add(taskSyslog);
       vargs.add(jobConf);
       vargs.add(program);
-      DebugScriptContext context = 
-        new TaskController.DebugScriptContext();
-      context.args = vargs;
-      context.stdout = stdout;
-      context.workDir = workDir;
-      context.task = task;
-      getTaskController().runDebugScript(context);
+      // TODO need to fix debug script
+      //DebugScriptContext context = 
+      //  new TaskController.DebugScriptContext();
+      //context.args = vargs;
+      //context.stdout = stdout;
+      //context.workDir = workDir;
+      //context.task = task;
+      //getTaskController().runDebugScript(context);
       // add the lines of debug out to diagnostics
       int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1);
       addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT");
@@ -3001,7 +2961,12 @@ public class TaskTracker 
             getRunState() == TaskStatus.State.UNASSIGNED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING ||
             isCleaningup()) {
-          kill(wasFailure);
+          try {
+            kill(wasFailure);
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted while killing " +
+                getTask().getTaskID(), e);
+          }
         }
       }
       
@@ -3013,8 +2978,10 @@ public class TaskTracker 
      * Something went wrong and the task must be killed.
      * 
      * @param wasFailure was it a failure (versus a kill request)?
+     * @throws InterruptedException 
      */
-    public synchronized void kill(boolean wasFailure) throws IOException {
+    public synchronized void kill(boolean wasFailure
+                                  ) throws IOException, InterruptedException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
           isCleaningup()) {
@@ -3118,7 +3085,7 @@ public class TaskTracker 
           return;
         }
         try {
-          removeTaskFiles(needCleanup, taskId);
+          removeTaskFiles(needCleanup);
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));
@@ -3130,47 +3097,26 @@ public class TaskTracker 
      * Some or all of the files from this task are no longer required. Remove
      * them via CleanupQueue.
      * 
-     * @param needCleanup
+     * @param removeOutputs remove outputs as well as output
      * @param taskId
      * @throws IOException 
      */
-    void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
-        throws IOException {
-      if (needCleanup) {
-        if (runner != null) {
-          // cleans up the output directory of the task (where map outputs
-          // and reduce inputs get stored)
-          runner.close();
-        }
-
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          // No jvm reuse, remove everything
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-                getLocalFiles(fConf, ""), task, false/* not workDir */,
-                taskController);
-          directoryCleanupThread.addToQueue(contexts);
+    void removeTaskFiles(boolean removeOutputs) throws IOException {
+      if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+        String user = ugi.getShortUserName();
+        String jobId = task.getJobID().toString();
+        String taskId = task.getTaskID().toString();
+        boolean cleanup = task.isTaskCleanupTask();
+        String taskDir;
+        if (!removeOutputs) {
+          taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
         } else {
-          // Jvm reuse. We don't delete the workdir since some other task
-          // (running in the same JVM) might be using the dir. The JVM
-          // running the tasks would clean the workdir per a task in the
-          // task process itself.
-          String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
-          PathDeletionContext[] contexts = buildPathDeletionContexts(
-              localFs, getLocalFiles(defaultJobConf, localTaskDir +
-                         Path.SEPARATOR + TaskTracker.JOBFILE));
-          directoryCleanupThread.addToQueue(contexts);
-        }
-      } else {
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-              getLocalFiles(fConf, ""), task, true /* workDir */,
-              taskController);
-          directoryCleanupThread.addToQueue(contexts);
+          taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
         }
+        PathDeletionContext item =
+          new TaskController.DeletionContext(taskController, false, user,
+                                             taskDir, localdirs);          
+        directoryCleanupThread.addToQueue(item);
       }
     }
         
@@ -3227,7 +3173,11 @@ public class TaskTracker 
     if (rjob == null) { //kill the JVM since the job is dead
       LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
                " is dead");
-      jvmManager.killJvm(jvmId);
+      try {
+        jvmManager.killJvm(jvmId);
+      } catch (InterruptedException e) {
+        LOG.warn("Failed to kill " + jvmId, e);
+      }
       return new JvmTask(null, true);
     }
     TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3448,12 +3398,15 @@ public class TaskTracker 
   static class RunningJob{
     private JobID jobid; 
     private JobConf jobConf;
+    private Path localizedJobConf;
     // keep this for later use
     volatile Set<TaskInProgress> tasks;
-    boolean localized;
+    volatile boolean localized;
     boolean keepJobFiles;
     UserGroupInformation ugi;
     FetchStatus f;
+    TaskDistributedCacheManager distCacheMgr;
+    
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -4118,42 +4071,54 @@ public class TaskTracker 
     return distributedCacheManager;
   }
   
-    /**
-     * Download the job-token file from the FS and save on local fs.
-     * @param user
-     * @param jobId
-     * @param jobConf
-     * @return the local file system path of the downloaded file.
-     * @throws IOException
-     */
-    private String localizeJobTokenFile(String user, JobID jobId)
-        throws IOException {
-      // check if the tokenJob file is there..
-      Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
-      
-      FileStatus status = null;
-      long jobTokenSize = -1;
-      status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
-      jobTokenSize = status.getLen();
-      
-      Path localJobTokenFile =
-          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
-              jobId.toString()), jobTokenSize, fConf);
-      String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
-      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
-          " to " + localJobTokenFileStr);
-      
-      // Download job_token
-      systemFS.copyToLocalFile(skPath, localJobTokenFile);      
-      return localJobTokenFileStr;
-    }
-
-    JobACLsManager getJobACLsManager() {
-      return aclsManager.getJobACLsManager();
-    }
+  /**
+   * Download the job-token file from the FS and save on local fs.
+   * @param user
+   * @param jobId
+   * @param jobConf
+   * @return the local file system path of the downloaded file.
+   * @throws IOException
+   */
+  private String localizeJobTokenFile(String user, JobID jobId)
+      throws IOException {
+    // check if the tokenJob file is there..
+    Path skPath = new Path(systemDirectory, 
+        jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
     
-    ACLsManager getACLsManager() {
-      return aclsManager;
-    }
+    FileStatus status = null;
+    long jobTokenSize = -1;
+    status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+    jobTokenSize = status.getLen();
+    
+    Path localJobTokenFile =
+        lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user, 
+            jobId.toString()), jobTokenSize, fConf);
+    String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+    LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
+        " to " + localJobTokenFileStr);
+    
+    // Download job_token
+    systemFS.copyToLocalFile(skPath, localJobTokenFile);      
+    return localJobTokenFileStr;
+  }
+
+  JobACLsManager getJobACLsManager() {
+    return aclsManager.getJobACLsManager();
+  }
+  
+  ACLsManager getACLsManager() {
+    return aclsManager;
+  }
+
+  synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
+    return runningTasks.get(tid);
+  }
+
+  @Override
+  public void updatePrivateDistributedCacheSizes(
+      org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+      throws IOException {
+    distributedCacheManager.setArchiveSizes(jobId, sizes);
+  }
+
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Jun  5 02:33:44 2012
@@ -161,4 +161,13 @@ public interface TaskUmbilicalProtocol e
                                                        TaskAttemptID id) 
   throws IOException;
 
+  /**
+   * The job initializer needs to report the sizes of the archive
+   * objects in the private distributed cache.
+   * @param jobId the job to update
+   * @param sizes the array of sizes that were computed
+   * @throws IOException
+   */
+  void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                          long[] sizes) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Jun  5 02:33:44 2012
@@ -32,6 +32,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -53,15 +56,19 @@ class UserLogCleaner extends Thread {
   private Map<JobID, Long> completedJobs = Collections
       .synchronizedMap(new HashMap<JobID, Long>());
   private final long threadSleepTime;
-  private MRAsyncDiskService logAsyncDisk;
   private Clock clock;
+  private TaskController taskController;
+  private CleanupQueue cleanupQueue;
+  private FileSystem localFs;
 
-  UserLogCleaner(Configuration conf) throws IOException {
+  UserLogCleaner(Configuration conf, TaskController taskController) 
+  throws IOException {
     threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
         DEFAULT_THREAD_SLEEP_TIME);
-    logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
-        .getUserLogDir().toString());
     setClock(new Clock());
+    localFs = FileSystem.getLocal(conf);
+    this.taskController = taskController;
+    cleanupQueue = CleanupQueue.getInstance();
   }
 
   void setClock(Clock clock) {
@@ -100,7 +107,7 @@ class UserLogCleaner extends Thread {
         // see if the job is old enough
         if (entry.getValue().longValue() <= now) {
           // add the job logs directory to for delete
-          deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+          deleteLogPath(entry.getKey().toString());
           completedJobIter.remove();
         }
       }
@@ -125,16 +132,12 @@ class UserLogCleaner extends Thread {
         // add all the log dirs to taskLogsMnonitor.
         long now = clock.getTime();
         for (String logDir : logDirs) {
-          if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
-            // skip this
-            continue;
-          }
           JobID jobid = null;
           try {
             jobid = JobID.forName(logDir);
           } catch (IllegalArgumentException ie) {
             // if the directory is not a jobid, delete it immediately
-            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+            deleteLogPath(logDir);
             continue;
           }
           // add the job log directory with default retain hours, if it is not
@@ -197,6 +200,11 @@ class UserLogCleaner extends Thread {
    */
   private void deleteLogPath(String logPath) throws IOException {
     LOG.info("Deleting user log path " + logPath);
-    logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+    String logRoot = TaskLog.getUserLogDir().toString();
+    String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+    PathDeletionContext item = 
+      new TaskController.DeletionContext(taskController, true, user, logPath,
+                                         null);
+    cleanupQueue.addToQueue(item);
   }
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Jun  5 02:33:44 2012
@@ -265,7 +265,7 @@ public interface JobContext extends MRJo
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getArchiveTimestamps();
+  public long[] getArchiveTimestamps();
 
   /**
    * Get the timestamps of the files.  Used by internal
@@ -273,7 +273,7 @@ public interface JobContext extends MRJo
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getFileTimestamps();
+  public long[] getFileTimestamps();
 
   /** 
    * Get the configured number of maximum attempts that will be made to run a

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Jun  5 02:33:44 2012
@@ -118,4 +118,4 @@ public class JobSubmissionFiles {
     return stagingArea;
   }
   
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Jun  5 02:33:44 2012
@@ -26,9 +26,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 import java.net.URI;
@@ -134,175 +131,6 @@ import java.net.URI;
 @Deprecated
 @InterfaceAudience.Private
 public class DistributedCache {
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, FileStatus fileStatus,
-                                   boolean isArchive, long confFileStamp,
-                                   Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
-        confFileStamp, currentWorkDir, true);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @param honorSymLinkConf if this is false, then the symlinks are not
-   * created even if conf says so (this is required for an optimization in task
-   * launches
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-      Path baseDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
-            confFileStamp, currentWorkDir, honorSymLinkConf, false);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param isArchive if the cache is an archive or a file. In case it is an 
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred 
-   *  is returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, boolean isArchive,
-                                   long confFileStamp, Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, 
-                         baseDir, null, isArchive,
-                         confFileStamp, currentWorkDir);
-  }
-
-  /**
-   * This is the opposite of getlocalcache. When you are done with
-   * using the cache, you need to release the cache
-   * @param cache The cache URI to be released
-   * @param conf configuration which contains the filesystem the cache 
-   * is contained in.
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static void releaseCache(URI cache, Configuration conf)
-      throws IOException {
-	// find the timestamp of the uri
-    URI[] archives = DistributedCache.getCacheArchives(conf);
-    URI[] files = DistributedCache.getCacheFiles(conf);
-    String[] archivesTimestamps =
-          DistributedCache.getArchiveTimestamps(conf);
-    String[] filesTimestamps =
-          DistributedCache.getFileTimestamps(conf);
-    String timestamp = null;
-    if (archives != null) {
-      for (int i = 0; i < archives.length; i++) {
-        if (archives[i].equals(cache)) {
-          timestamp = archivesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null && files != null) {
-      for (int i = 0; i < files.length; i++) {
-        if (files[i].equals(cache)) {
-          timestamp = filesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null) {
-      throw new IOException("TimeStamp of the uri couldnot be found");
-    }
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-           .releaseCache(cache, conf, Long.parseLong(timestamp),
-            TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
-  }
-  
-  /**
-   * Returns the relative path of the dir this cache will be localized in
-   * relative path that this cache will be localized in. For
-   * hdfs://hostname:port/absolute_path -- the relative path is
-   * hostname/absolute path -- if it is just /absolute_path -- then the
-   * relative path is hostname of DFS this mapred cluster is running
-   * on/absolute_path
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  @Deprecated
-  public static String makeRelative(URI cache, Configuration conf)
-      throws IOException {
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .makeRelative(cache, conf);
-  }
 
   /**
    * Returns mtime of a given cache file on hdfs.
@@ -361,11 +189,22 @@ public class DistributedCache {
     conf.set(MRJobConfig.CACHE_FILES, sfiles);
   }
 
+  private static Path[] parsePaths(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    Path[] result = new Path[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = new Path(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get cache archives set in the Configuration.  Used by
    * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
-   * @return A URI array of the caches set in the Configuration
+   * @return An array of the caches set in the Configuration
    * @throws IOException
    * @deprecated Use {@link JobContext#getCacheArchives()} instead
    */
@@ -378,7 +217,7 @@ public class DistributedCache {
    * Get cache files set in the Configuration.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
-   * @return A URI array of the files set in the Configuration
+   * @return Am array of the files set in the Configuration
    * @throws IOException
    * @deprecated Use {@link JobContext#getCacheFiles()} instead
    */
@@ -417,30 +256,46 @@ public class DistributedCache {
   }
 
   /**
+   * Parse a list of strings into longs.
+   * @param strs the list of strings to parse
+   * @return a list of longs that were parsed. same length as strs.
+   */
+  private static long[] parseTimestamps(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  /**
    * Get the timestamps of the archives.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
    */
   @Deprecated
-  public static String[] getArchiveTimestamps(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+  public static long[] getArchiveTimestamps(Configuration conf) {
+    return
+      parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
   }
 
-
   /**
    * Get the timestamps of the files.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    * @deprecated Use {@link JobContext#getFileTimestamps()} instead
    */
   @Deprecated
-  public static String[] getFileTimestamps(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+  public static long[] getFileTimestamps(Configuration conf) {
+    return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
   }
 
   /**
@@ -511,8 +366,8 @@ public class DistributedCache {
   @Deprecated
   public static void addCacheArchive(URI uri, Configuration conf) {
     String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
-    conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
-             : archives + "," + uri.toString());
+    conf.set(MRJobConfig.CACHE_ARCHIVES,
+        archives == null ? uri.toString() : archives + "," + uri.toString());
   }
   
   /**
@@ -525,8 +380,32 @@ public class DistributedCache {
   @Deprecated
   public static void addCacheFile(URI uri, Configuration conf) {
     String files = conf.get(MRJobConfig.CACHE_FILES);
-    conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + ","
-             + uri.toString());
+    conf.set(MRJobConfig.CACHE_FILES,
+        files == null ? uri.toString() : files + "," + uri.toString());
+  }
+  
+  /**
+   * Add a archive that has been localized to the conf.  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  public static void addLocalArchives(Configuration conf, String str) {
+    String archives = conf.get(MRJobConfig.CACHE_LOCALARCHIVES);
+    conf.set(MRJobConfig.CACHE_LOCALARCHIVES,
+        archives == null ? str : archives + "," + str);
+  }
+
+  /**
+   * Add a file that has been localized to the conf..  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  public static void addLocalFiles(Configuration conf, String str) {
+    String files = conf.get(MRJobConfig.CACHE_LOCALFILES);
+    conf.set(MRJobConfig.CACHE_LOCALFILES,
+        files == null ? str : files + "," + str);
   }
 
   /**
@@ -541,8 +420,8 @@ public class DistributedCache {
   public static void addFileToClassPath(Path file, Configuration conf)
     throws IOException {
     String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
-    conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
-             : classpath + "," + file.toString());
+    conf.set(MRJobConfig.CLASSPATH_FILES,
+        classpath == null ? file.toString() : classpath + "," + file);
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(file).toUri();
 
@@ -654,17 +533,4 @@ public class DistributedCache {
   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
     return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
   }
-
-  /**
-   * Clear the entire contents of the cache and delete the backing files. This
-   * should only be used when the server is reinitializing, because the users
-   * are going to lose their files.
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .purgeCache();
-  }
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Jun  5 02:33:44 2012
@@ -30,8 +30,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -48,10 +51,9 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public class TaskDistributedCacheManager {
   private final TrackerDistributedCacheManager distributedCacheManager;
-  private final Configuration taskConf;
   private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
   private final List<String> classPaths = new ArrayList<String>();
- 
+  
   private boolean setupCalled = false;
 
   /**
@@ -75,9 +77,9 @@ public class TaskDistributedCacheManager
     boolean localized = false;
     /** The owner of the localized file. Relevant only on the tasktrackers */
     final String owner;
-
-    private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
-        boolean classPath) throws IOException {
+    private CacheStatus status;
+    CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
+                      boolean classPath) throws IOException {
       this.uri = uri;
       this.type = type;
       this.isPublic = isPublic;
@@ -88,12 +90,28 @@ public class TaskDistributedCacheManager
     }
 
     /**
+     * Set the status for this cache file.
+     * @param status
+     */
+    public void setStatus(CacheStatus status) {
+      this.status = status;
+    }
+    
+    /**
+     * Get the status for this cache file.
+     * @return the status object
+     */
+    public CacheStatus getStatus() {
+      return status;
+    }
+
+    /**
      * Converts the scheme used by DistributedCache to serialize what files to
      * cache in the configuration into CacheFile objects that represent those 
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, String cacheVisibilities[], Path[] paths,
+        long[] timestamps, boolean cacheVisibilities[], Path[] paths, 
         FileType type) throws IOException {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
@@ -109,9 +127,8 @@ public class TaskDistributedCacheManager
         for (int i = 0; i < uris.length; ++i) {
           URI u = uris[i];
           boolean isClassPath = (null != classPaths.get(u.getPath()));
-          long t = Long.parseLong(timestamps[i]);
-          ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
-              t, isClassPath));
+          ret.add(new CacheFile(u, type, cacheVisibilities[i],
+              timestamps[i], isClassPath));
         }
       }
       return ret;
@@ -130,7 +147,6 @@ public class TaskDistributedCacheManager
       TrackerDistributedCacheManager distributedCacheManager,
       Configuration taskConf) throws IOException {
     this.distributedCacheManager = distributedCacheManager;
-    this.taskConf = taskConf;
     
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
@@ -147,36 +163,42 @@ public class TaskDistributedCacheManager
   }
 
   /**
-   * Retrieve files into the local cache and updates the task configuration 
-   * (which has been passed in via the constructor).
+   * Retrieve public distributed cache files into the local cache and updates
+   * the task configuration (which has been passed in via the constructor).
+   * The private distributed cache is just looked at and the paths where the
+   * files/archives should go to is decided here. The actual localization is
+   * done by {@link JobLocalizer}.
    * 
    * It is the caller's responsibility to re-write the task configuration XML
    * file, if necessary.
    */
-  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
-      String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+  public void setupCache(Configuration taskConf, String publicCacheSubdir, 
+      String privateCacheSubdir) throws IOException {
     setupCalled = true;
       
-    if (cacheFiles.isEmpty()) {
-      return;
-    }
-
     ArrayList<Path> localArchives = new ArrayList<Path>();
     ArrayList<Path> localFiles = new ArrayList<Path>();
-    Path workdirPath = new Path(workDir.getAbsolutePath());
 
     for (CacheFile cacheFile : cacheFiles) {
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheSubdir = publicCacheSubDir;
-      if (!cacheFile.isPublic) {
-        cacheSubdir = privateCacheSubdir;
-      }
-      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          cacheSubdir, fileStatus, 
-          cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+      Path p;
+      try {
+        if (cacheFile.isPublic) {
+          p = distributedCacheManager.getLocalCache(uri, taskConf,
+              publicCacheSubdir, fileStatus, 
+              cacheFile.type == CacheFile.FileType.ARCHIVE,
+              cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+        } else {
+          p = distributedCacheManager.getLocalCache(uri, taskConf,
+              privateCacheSubdir, fileStatus, 
+              cacheFile.type == CacheFile.FileType.ARCHIVE,
+              cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+        }
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted localizing cache file", e);
+      }
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -191,10 +213,14 @@ public class TaskDistributedCacheManager
 
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
+      // TODO verify
+//      DistributedCache.addLocalArchives(taskConf, 
       TrackerDistributedCacheManager.setLocalArchives(taskConf, 
         stringifyPathList(localArchives));
     }
     if (!localFiles.isEmpty()) {
+      // TODO verify
+//      DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
       TrackerDistributedCacheManager.setLocalFiles(taskConf,
         stringifyPathList(localFiles));
     }
@@ -239,9 +265,18 @@ public class TaskDistributedCacheManager
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      if (c.getLocalized()) {
-        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
-            c.owner);
+      if (c.getLocalized() && c.status != null) {
+        distributedCacheManager.releaseCache(c.status);
+      }
+    }
+  }
+
+  public void setSizes(long[] sizes) throws IOException {
+    int i = 0;
+    for (CacheFile c: cacheFiles) {
+      if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
+    	  c.status != null) {
+        distributedCacheManager.setSize(c.status, sizes[i++]);
       }
     }
   }



Mime
View raw message