hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077679 [3/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/core/org/apache/hadoop/fs/ src/core/org/apa...
Date Fri, 04 Mar 2011 04:43:35 GMT
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Fri Mar  4 04:43:33 2011
@@ -21,6 +21,9 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.DateFormat;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -33,8 +36,6 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -44,7 +45,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
@@ -63,6 +67,11 @@ public class TrackerDistributedCacheMana
   // cacheID to cacheStatus mapping
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
+  private Map<JobID, TaskDistributedCacheManager> jobArchives =
+    new HashMap<JobID, TaskDistributedCacheManager>();
+  private final TaskController taskController;
+  private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
+    FsPermission.createImmutable((short) 0755);
 
   // For holding the properties of each cache directory
   static class CacheDir {
@@ -84,19 +93,17 @@ public class TrackerDistributedCacheMana
   private final LocalFileSystem localFs;
   
   private LocalDirAllocator lDirAllocator;
-  
-  private TaskController taskController;
-  
+    
   private Configuration trackerConf;
   
   private Random random = new Random();
 
   public TrackerDistributedCacheManager(Configuration conf,
-      TaskController taskController) throws IOException {
+                                        TaskController controller
+                                        ) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
-    this.taskController = taskController;
 
     // setting the cache size to a default of 10GB
     this.allowedCacheSize = conf.getLong
@@ -105,6 +112,7 @@ public class TrackerDistributedCacheMana
     this.allowedCacheSubdirs = conf.getLong
       ("mapreduce.tasktracker.local.cache.numberdirectories",
        DEFAULT_CACHE_SUBDIR_LIMIT);
+    this.taskController = controller;
   }
 
   /**
@@ -125,13 +133,6 @@ public class TrackerDistributedCacheMana
    *  In case of a file, the path to the file is returned
    * @param confFileStamp this is the hdfs file modification timestamp to verify
    * that the file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create
-   * symlinks for the locally cached files/archives
-   * @param honorSymLinkConf if this is false, then the symlinks are not
-   * created even if conf says so (this is required for an optimization in task
-   * launches
-   * NOTE: This is effectively always on since r696957, since there is no code
-   * path that does not use this.
    * @param isPublic to know the cache file is accessible to public or private
    * @return the path to directory where the archives are unjarred in case of
    * archives, the path to the file where the file is copied locally
@@ -140,12 +141,14 @@ public class TrackerDistributedCacheMana
   Path getLocalCache(URI cache, Configuration conf,
       String subDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
+      boolean isPublic)
       throws IOException {
     String key;
-    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+    String user = getLocalizedCacheOwner(isPublic);
+    key = getKey(cache, conf, confFileStamp, user);
     CacheStatus lcacheStatus;
     Path localizedPath = null;
+    Path localPath = null;
     synchronized (cachedArchives) {
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -156,10 +159,12 @@ public class TrackerDistributedCacheMana
              + "_" + (confFileStamp % Integer.MAX_VALUE));
         String cachePath = new Path (subDir, 
           new Path(uniqueString, makeRelative(cache, conf))).toString();
-        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+        localPath = lDirAllocator.getLocalPathForWrite(cachePath,
           fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
-          cachePath, "")), localPath, new Path(subDir), uniqueString);
+        lcacheStatus = 
+          new CacheStatus(new Path(localPath.toString().replace(cachePath, "")), 
+                          localPath, new Path(subDir), uniqueString, 
+                          isPublic ? null : user);
         cachedArchives.put(key, lcacheStatus);
       }
 
@@ -172,15 +177,20 @@ public class TrackerDistributedCacheMana
       // do the localization, after releasing the global lock
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
-          localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive, isPublic);
+          if (isPublic) {
+            localizedPath = localizePublicCacheObject(conf, 
+                cache, 
+                confFileStamp,
+                lcacheStatus, fileStatus, 
+                isArchive);
+          } else {
+            localizedPath = localPath;
+          }
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
-              lcacheStatus, fileStatus, isArchive);
+              lcacheStatus, fileStatus, isArchive);            
         }
-        createSymlink(conf, cache, lcacheStatus, isArchive,
-            currentWorkDir, honorSymLinkConf);
       }
 
       // try deleting stuff if you can
@@ -240,6 +250,21 @@ public class TrackerDistributedCacheMana
     }
   }
 
+  void setSize(URI cache, Configuration conf, long timeStamp,
+               String owner, long size) throws IOException {
+    String key = getKey(cache, conf, timeStamp, owner);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(key);
+      if (lcacheStatus == null) {
+        LOG.warn("Cannot find localized cache: " + cache + 
+                 " (key: " + key + ") in setSize!");
+        return;
+      }
+      lcacheStatus.size = size;
+      addCacheInfoUpdate(lcacheStatus);
+    }
+  }
+
   /*
    * This method is called from unit tests. 
    */
@@ -284,7 +309,6 @@ public class TrackerDistributedCacheMana
           it.hasNext();) {
         String cacheId = it.next();
         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        
         // if reference count is zero 
         // mark the cache for deletion
         if (lcacheStatus.refcount == 0) {
@@ -299,22 +323,28 @@ public class TrackerDistributedCacheMana
     // do the deletion, after releasing the global lock
     for (CacheStatus lcacheStatus : deleteList) {
       synchronized (lcacheStatus) {
-        FileSystem localFS = FileSystem.getLocal(conf);
-
         Path potentialDeletee = lcacheStatus.localizedLoadPath;
+        Path localizedDir = lcacheStatus.getLocalizedUniqueDir();
+        if (lcacheStatus.user == null) {
 
-        localFS.delete(potentialDeletee, true);
+          localFs.delete(potentialDeletee, true);
 
-        // Update the maps baseDirSize and baseDirNumberSubDir
-        LOG.info("Deleted path " + potentialDeletee);
+          // Update the maps baseDirSize and baseDirNumberSubDir
+          LOG.info("Deleted path " + potentialDeletee);
 
-        try {
-          localFS.delete(lcacheStatus.getLocalizedUniqueDir(), true);
-        } catch (IOException e) {
-          LOG.warn("Could not delete distributed cache empty directory "
-                   + lcacheStatus.getLocalizedUniqueDir());
+          try {
+            localFs.delete(localizedDir, true);
+          } catch (IOException e) {
+            LOG.warn("Could not delete distributed cache empty directory "
+                     + localizedDir);
+          }
+        } else {
+          int userDir = TaskTracker.getUserDir(lcacheStatus.user).length();
+          taskController.deleteAsUser(lcacheStatus.user,
+                                      potentialDeletee.toString().substring(userDir));
+          taskController.deleteAsUser(lcacheStatus.user,
+                                      localizedDir.toString().substring(userDir));          
         }
-
         deleteCacheInfoUpdate(lcacheStatus);
       }
     }
@@ -409,51 +439,54 @@ public class TrackerDistributedCacheMana
     return false;
   }
 
-  private void createSymlink(Configuration conf, URI cache,
-      CacheStatus cacheStatus, boolean isArchive,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
-    if(cache.getFragment() == null) {
-      doSymlink = false;
-    }
-
-    String link = 
-      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (doSymlink){
-      if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
-      }
-    }
-  }
-
-  //the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  Path localizeCache(Configuration conf,
-                                      URI cache, long confFileStamp,
-                                      CacheStatus cacheStatus,
-                                      FileStatus fileStatus,
-                                      boolean isArchive, boolean isPublic)
-      throws IOException {
-    FileSystem fs = FileSystem.get(cache, conf);
+  /**
+   * Download a given path to the local file system.
+   * @param conf the job's configuration
+   * @param source the source to copy from
+   * @param destination where to copy the file. must be local fs
+   * @param desiredTimestamp the required modification timestamp of the source
+   * @param isArchive is this an archive that should be expanded
+   * @param permission the desired permissions of the file.
+   * @return for archives, the number of bytes in the unpacked directory
+   * @throws IOException
+   */
+  public static long downloadCacheObject(Configuration conf,
+                                         URI source,
+                                         Path destination,
+                                         long desiredTimestamp,
+                                         boolean isArchive,
+                                         FsPermission permission
+                                         ) throws IOException {
+    FileSystem sourceFs = FileSystem.get(source, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
+    
+    Path sourcePath = new Path(source.getPath());
+    long modifiedTime = 
+      sourceFs.getFileStatus(sourcePath).getModificationTime();
+    if (modifiedTime != desiredTimestamp) {
+      DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, 
+                                                     DateFormat.SHORT);
+      throw new IOException("The distributed cache object " + source + 
+                            " changed during the job from " + 
+                            df.format(new Date(desiredTimestamp)) + " to " +
+                            df.format(new Date(modifiedTime)));
+    }
+    
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localizedLoadPath,
-        new Path(cacheStatus.localizedLoadPath.getName()));
+      parchive = new Path(destination,
+        new Path(destination.getName()));
     } else {
-      parchive = cacheStatus.localizedLoadPath;
+      parchive = destination;
     }
-
-    if (!localFs.mkdirs(parchive.getParent())) {
+    if (!localFs.mkdirs(destination.getParent())) {
       throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localizedLoadPath.toString());
+                            destination);
     }
 
-    String cacheId = cache.getPath();
-    fs.copyToLocalFile(new Path(cacheId), parchive);
+    sourceFs.copyToLocalFile(sourcePath, parchive);
     if (isArchive) {
-      String tmpArchive = parchive.toString().toLowerCase();
+      String tmpArchive = destination.toString().toLowerCase();
       File srcFile = new File(parchive.toString());
       File destDir = new File(parchive.getParent().toString());
       LOG.info(String.format("Extracting %s to %s",
@@ -472,46 +505,37 @@ public class TrackerDistributedCacheMana
         // and copy the file into the dir as it is
       }
     }
-    long cacheSize =
+    // set proper permissions for the localized directory
+    localFs.setPermission(destination, permission);
+
+    LOG.info(String.format("Cached %s as %s",
+             source.toString(), destination.toString()));
+    long cacheSize = 
       FileUtil.getDU(new File(parchive.getParent().toString()));
-    cacheStatus.size = cacheSize;
+    return cacheSize;
+  }
+
+  //the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  Path localizePublicCacheObject(Configuration conf,
+                                 URI cache, long confFileStamp,
+                                 CacheStatus cacheStatus,
+                                 FileStatus fileStatus,
+                                 boolean isArchive) throws IOException {
+    long size = downloadCacheObject(conf, cache, cacheStatus.localizedLoadPath,
+                                    confFileStamp, isArchive, 
+                                    PUBLIC_CACHE_OBJECT_PERM);
+    cacheStatus.size = size;
     
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
     addCacheInfoUpdate(cacheStatus);
 
-    // set proper permissions for the localized directory
-    setPermissions(conf, cacheStatus, isPublic);
-
-    // update cacheStatus to reflect the newly cached file
-    cacheStatus.mtime = DistributedCache.getTimestamp(conf, cache);
-
     LOG.info(String.format("Cached %s as %s",
              cache.toString(), cacheStatus.localizedLoadPath));
     return cacheStatus.localizedLoadPath;
   }
 
-  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
-      boolean isPublic) throws IOException {
-    if (isPublic) {
-      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
-      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
-      try {
-        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
-      } catch (InterruptedException e) {
-        LOG.warn("Exception in chmod" + e.toString());
-        throw new IOException(e);
-      }
-    } else {
-      // invoke taskcontroller to set permissions
-      DistributedCacheFileContext context = new DistributedCacheFileContext(
-          conf.get("user.name"), new File(cacheStatus.localizedBaseDir
-              .toString()), cacheStatus.localizedBaseDir,
-          cacheStatus.uniqueString);
-      taskController.initializeDistributedCacheFile(context);
-    }
-  }
-
   private static boolean isTarFile(String filename) {
     return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
            filename.endsWith(".tar"));
@@ -537,9 +561,6 @@ public class TrackerDistributedCacheMana
       " has changed on HDFS since job started");
     }
 
-    if (dfsFileStamp != lcacheStatus.mtime) {
-      return false;
-    }
     return true;
   }
 
@@ -595,9 +616,6 @@ public class TrackerDistributedCacheMana
     // number of instances using this cache
     int refcount;
 
-    // the cache-file modification time
-    long mtime;
-
     // is it initialized ?
     boolean inited = false;
 
@@ -607,17 +625,20 @@ public class TrackerDistributedCacheMana
     
     // unique string used in the construction of local load path
     String uniqueString;
+    
+    // The user that owns the cache entry or null if it is public
+    final String user;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
-        String uniqueString) {
+                       String uniqueString, String user) {
       super();
       this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
-      this.mtime = -1;
       this.localizedBaseDir = baseDir;
       this.size = 0;
       this.subDir = subDir;
       this.uniqueString = uniqueString;
+      this.user = user;
     }
     
     Path getBaseDir(){
@@ -657,11 +678,29 @@ public class TrackerDistributedCacheMana
     }
   }
 
-  public TaskDistributedCacheManager newTaskDistributedCacheManager(
-      Configuration taskConf) throws IOException {
-    return new TaskDistributedCacheManager(this, taskConf);
+  public TaskDistributedCacheManager 
+  newTaskDistributedCacheManager(JobID jobId,
+                                 Configuration taskConf) throws IOException {
+    TaskDistributedCacheManager result = 
+      new TaskDistributedCacheManager(this, taskConf);
+    jobArchives.put(jobId, result);
+    return result;
+  }
+
+  public void releaseJob(JobID jobId) throws IOException {
+    TaskDistributedCacheManager mgr = jobArchives.get(jobId);
+    if (mgr != null) {
+      mgr.release();
+      jobArchives.remove(jobId);
+    }
   }
 
+  public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
+    TaskDistributedCacheManager mgr = jobArchives.get(jobId);
+    if (mgr != null) {
+      mgr.setSizes(sizes);
+    }
+  }
 
   /**
    * Determines timestamps of files to be cached, and stores those
@@ -744,25 +783,37 @@ public class TrackerDistributedCacheMana
     }
   }
   
+  private static boolean[] parseBooleans(String[] strs) {
+    if (null == strs) {
+      return null;
+    }
+    boolean[] result = new boolean[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Boolean.parseBoolean(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get the booleans on whether the files are public or not.  Used by 
    * internal DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of booleans 
+   * @return array of booleans 
    * @throws IOException
    */
-  static String[] getFileVisibilities(Configuration conf) {
-    return conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES);
+  public static boolean[] getFileVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(JobContext.CACHE_FILE_VISIBILITIES));
   }
 
   /**
    * Get the booleans on whether the archives are public or not.  Used by 
    * internal DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of booleans 
+   * @return array of booleans 
    */
-  static String[] getArchiveVisibilities(Configuration conf) {
-    return conf.getStrings(JobContext.CACHE_ARCHIVES_VISIBILITIES);
+  public static boolean[] getArchiveVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(JobContext.
+                                           CACHE_ARCHIVES_VISIBILITIES));
   }
 
   /**
@@ -837,8 +888,6 @@ public class TrackerDistributedCacheMana
 
     Path thisSubject = null;
 
-    String thisCategory = DistributedCache.CACHE_ARCHIVES;
-
     if (archiveStrings != null && fileStrings != null) {
       final Set<Path> archivesSet = new HashSet<Path>();
 
@@ -846,8 +895,6 @@ public class TrackerDistributedCacheMana
         archivesSet.add(coreLocation(archiveString, conf));
       }
 
-      thisCategory = DistributedCache.CACHE_FILES;
-
       for (String fileString : fileStrings) {
         thisSubject = coreLocation(fileString, conf);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Child.java Fri Mar  4 04:43:33 2011
@@ -24,19 +24,27 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -55,6 +63,7 @@ class Child {
 
   static volatile TaskAttemptID taskid = null;
   static volatile boolean isCleanup;
+  static String cwd;
 
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
@@ -69,6 +78,12 @@ class Child {
     int jvmIdInt = Integer.parseInt(args[4]);
     JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
     String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";
+    
+    cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
+    if (cwd == null) {
+      throw new IOException("Environment variable " + 
+                             TaskRunner.HADOOP_WORK_DIR + " is not set");
+    }
 
     // file name is passed thru env
     String jobTokenFile = 
@@ -171,10 +186,6 @@ class Child {
         isCleanup = task.isTaskCleanupTask();
         // reset the statistics for the task
         FileSystem.clearStatistics();
-
-        //create the index file so that the log files 
-        //are viewable immediately
-        TaskLog.syncLogs(logLocation, taskid, isCleanup);
         
         // Create the job-conf and set credentials
         final JobConf job = new JobConf(task.getJobFile());
@@ -187,12 +198,19 @@ class Child {
         // setup the child's mapred-local-dir. The child is now sandboxed and
         // can only see files down and under attemtdir only.
         TaskRunner.setupChildMapredLocalDirs(task, job);
+        
+        // setup the child's attempt directories
+        localizeTask(task, job, logLocation);
 
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
-
+        TaskRunner.setupWorkDir(job, new File(cwd));
+        
+        //create the index file so that the log files 
+        //are viewable immediately
+        TaskLog.syncLogs(logLocation, taskid, isCleanup);
+        
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);
 
@@ -219,6 +237,10 @@ class Child {
               taskFinal.run(job, umbilical);             // run the task
             } finally {
               TaskLog.syncLogs(logLocation, taskid, isCleanup);
+              TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
+              trunc.truncateLogs(new JVMInfo(
+                  TaskLog.getAttemptDir(taskFinal.getTaskID(),
+                    taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
             }
 
             return null;
@@ -288,4 +310,19 @@ class Child {
     DefaultMetricsSystem.INSTANCE.shutdown();
   }
 
+  static void localizeTask(Task task, JobConf jobConf, String logLocation) 
+  throws IOException{
+    
+    // Do the task-type specific localization
+    task.localizeConfiguration(jobConf);
+    
+    //write the localized task jobconf
+    LocalDirAllocator lDirAlloc = 
+      new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    Path localTaskFile =
+      lDirAlloc.getLocalPathForWrite(TaskTracker.JOBFILE, jobConf);
+    JobLocalizer.writeLocalJobFile(localTaskFile, jobConf);
+    task.setJobFile(localTaskFile.toString());
+    task.setConf(jobConf);
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Mar  4 04:43:33 2011
@@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 class CleanupQueue {
@@ -54,24 +54,23 @@ class CleanupQueue {
    * Contains info related to the path of the file/dir to be deleted
    */
   static class PathDeletionContext {
-    String fullPath;// full path of file or dir
-    FileSystem fs;
+    Path fullPath;// full path of file or dir
+    Configuration conf;
 
-    public PathDeletionContext(FileSystem fs, String fullPath) {
-      this.fs = fs;
+    public PathDeletionContext(Path fullPath, Configuration conf) {
       this.fullPath = fullPath;
+      this.conf = conf;
     }
     
-    protected String getPathForCleanup() {
+    protected Path getPathForCleanup() {
       return fullPath;
     }
 
     /**
-     * Makes the path(and its subdirectories recursively) fully deletable
+     * Deletes the path (and its subdirectories recursively)
      */
-    protected void enablePathForCleanup() throws IOException {
-      // Do nothing by default.
-      // Subclasses can override to provide enabling for deletion.
+    protected void deletePath() throws IOException {
+      fullPath.getFileSystem(conf).delete(fullPath, true);
     }
   }
 
@@ -82,19 +81,6 @@ class CleanupQueue {
     cleanupThread.addToQueue(contexts);
   }
 
-  protected static boolean deletePath(PathDeletionContext context)
-            throws IOException {
-    context.enablePathForCleanup();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to delete " + context.fullPath);
-    }
-    if (context.fs.exists(new Path(context.fullPath))) {
-      return context.fs.delete(new Path(context.fullPath), true);
-    }
-    return true;
-  }
-
   // currently used by tests only
   protected boolean isQueueEmpty() {
     return (cleanupThread.queue.size() == 0);
@@ -128,11 +114,9 @@ class CleanupQueue {
       while (true) {
         try {
           context = queue.take();
+          context.deletePath();
           // delete the path.
-          if (!deletePath(context)) {
-            LOG.warn("CleanupThread:Unable to delete path " + context.fullPath);
-          }
-          else if (LOG.isDebugEnabled()) {
+          if (LOG.isDebugEnabled()) {
             LOG.debug("DELETED " + context.fullPath);
           }
         } catch (InterruptedException t) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 04:43:33 2011
@@ -18,20 +18,23 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.ProcessTree;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 
 /**
  * The default implementation for controlling tasks.
@@ -48,147 +51,188 @@ public class DefaultTaskController exten
 
   private static final Log LOG = 
       LogFactory.getLog(DefaultTaskController.class);
-  /**
-   * Launch a new JVM for the task.
-   * 
-   * This method launches the new JVM for the task by executing the
-   * the JVM command using the {@link Shell.ShellCommandExecutor}
-   */
-  void launchTaskJVM(TaskController.TaskControllerContext context) 
-                                      throws IOException {
-    initializeTask(context);
-
-    JvmEnv env = context.env;
-    List<String> wrappedCommand = 
-      TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true);
-    ShellCommandExecutor shexec = 
-        new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
-                                  env.workDir, env.env);
-    // set the ShellCommandExecutor for later use.
-    context.shExec = shexec;
-    shexec.execute();
-  }
-    
-  /**
-   * Initialize the task environment.
-   * 
-   * Since tasks are launched as the tasktracker user itself, this
-   * method has no action to perform.
-   */
-  void initializeTask(TaskController.TaskControllerContext context) {
-    // The default task controller does not need to set up
-    // any permissions for proper execution.
-    // So this is a dummy method.
-    return;
-  }
-
-  /*
-   * No need to do anything as we don't need to do as we dont need anything
-   * extra from what TaskTracker has done.
-   */
+  private FileSystem fs;
   @Override
-  void initializeJob(JobInitializationContext context) {
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    try {
+      fs = FileSystem.getLocal(conf).getRaw();
+    } catch (IOException ie) {
+      throw new RuntimeException("Failed getting LocalFileSystem", ie);
+    }
   }
 
+  /**
+   * Create all of the directories for the task and launches the child jvm.
+   * @param user the user name
+   * @param attemptId the attempt id
+   * @throws IOException
+   */
   @Override
-  void terminateTask(TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (Shell.WINDOWS) {
-        // Currently we don't use setsid on WINDOWS. 
-        //So kill the process alone.
-        if (process != null) {
-          process.destroy();
-        }
+  public int launchTask(String user, 
+                                  String jobId,
+                                  String attemptId,
+                                  List<String> setup,
+                                  List<String> jvmArguments,
+                                  File currentWorkDirectory,
+                                  String stdout,
+                                  String stderr) throws IOException {
+    
+    ShellCommandExecutor shExec = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(getConf());
+      
+      //create the attempt dirs
+      new Localizer(localFs, 
+          getConf().getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)).
+          initializeAttemptDirs(user, jobId, attemptId);
+      
+      // create the working-directory of the task 
+      if (!currentWorkDirectory.mkdir()) {
+        throw new IOException("Mkdirs failed to create " 
+                    + currentWorkDirectory.toString());
       }
-      else { // In addition to the task JVM, kill its subprocesses also.
-        String pid = context.pid;
-        if (pid != null) {
-          if(ProcessTree.isSetsidAvailable) {
-            ProcessTree.terminateProcessGroup(pid);
-          }else {
-            ProcessTree.terminateProcess(pid);
-          }
-        }
+      
+      //mkdir the loglocation
+      String logLocation = TaskLog.getAttemptDir(jobId, attemptId).toString();
+      if (!localFs.mkdirs(new Path(logLocation))) {
+        throw new IOException("Mkdirs failed to create " 
+                   + logLocation);
       }
-    }
-  }
-  
-  @Override
-  void killTask(TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-    if (shexec != null) {
-      if (Shell.WINDOWS) {
-        //We don't do send kill process signal in case of windows as 
-        //already we have done a process.destroy() in termintateTaskJVM()
-        return;
-      }
-      String pid = context.pid;
-      if (pid != null) {
-        if(ProcessTree.isSetsidAvailable) {
-          ProcessTree.killProcessGroup(pid);
-        }else {
-          ProcessTree.killProcess(pid);
-        }
+      //read the configuration for the job
+      FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+      long logSize = 0; //TODO: Ref BUG:2854624
+      // get the JVM command line.
+      String cmdLine = 
+        TaskLog.buildCommandLine(setup, jvmArguments,
+            new File(stdout), new File(stderr), logSize, true);
+
+      // write the command to a file in the
+      // task specific cache directory
+      // TODO copy to user dir
+      Path p = new Path(allocator.getLocalPathForWrite(
+          TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+          getConf()), COMMAND_FILE);
+
+      String commandFile = writeCommand(cmdLine, rawFs, p);
+      rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
+      shExec = new ShellCommandExecutor(new String[]{
+          "bash", "-c", commandFile},
+          currentWorkDirectory);
+      shExec.execute();
+    } catch (Exception e) {
+      if (shExec == null) {
+        return -1;
       }
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      LOG.info("Output from DefaultTaskController's launchTask follows:");
+      logOutput(shExec.getOutput());
+      return exitCode;
     }
+    return 0;
   }
-
+    
   /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-         throws IOException {
-    enablePathForCleanup(context);
+   * This routine initializes the local file system for running a job.
+   * Details:
+   * <ul>
+   * <li>Copies the credentials file from the TaskTracker's private space to
+   * the job's private space </li>
+   * <li>Creates the job work directory and set 
+   * {@link TaskTracker#JOB_LOCAL_DIR} in the configuration</li>
+   * <li>Downloads the job.jar, unjars it, and updates the configuration to 
+   * reflect the localized path of the job.jar</li>
+   * <li>Creates a base JobConf in the job's private space</li>
+   * <li>Sets up the distributed cache</li>
+   * <li>Sets up the user logs directory for the job</li>
+   * </ul>
+   * This method must be invoked in the access control context of the job owner 
+   * user. This is because the distributed cache is also setup here and the 
+   * access to the hdfs files requires authentication tokens in case where 
+   * security is enabled.
+   * @param user the user in question (the job owner)
+   * @param jobid the ID of the job in question
+   * @param credentials the path to the credentials file that the TaskTracker
+   * downloaded
+   * @param jobConf the path to the job configuration file that the TaskTracker
+   * downloaded
+   * @param taskTracker the connection to the task tracker
+   * @throws IOException
+   */
+  @Override
+  public void initializeJob(String user, String jobid, 
+                            Path credentials, Path jobConf, 
+                            TaskUmbilicalProtocol taskTracker
+                            ) throws IOException {
+    final LocalDirAllocator lDirAlloc = allocator;
+    FileSystem localFs = FileSystem.getLocal(getConf());
+    JobLocalizer localizer = new JobLocalizer((JobConf)getConf(), user, jobid);
+    localizer.createLocalDirs();
+    localizer.createUserDirs();
+    localizer.createJobDirs();
+
+    JobConf jConf = new JobConf(jobConf);
+    localizer.createWorkDir(jConf);
+    //copy the credential file
+    Path localJobTokenFile = lDirAlloc.getLocalPathForWrite(
+        TaskTracker.getLocalJobTokenFile(user, jobid), getConf());
+    FileUtil.copy(
+        localFs, credentials, localFs, localJobTokenFile, false, getConf());
+
+
+    //setup the user logs dir
+    localizer.initializeJobLogDir();
+
+    // Download the job.jar for this job from the system FS
+    // setup the distributed cache
+    // write job acls
+    // write localized config
+    localizer.localizeJobFiles(JobID.forName(jobid), jConf, localJobTokenFile, 
+                               taskTracker);
+  }
+
+  @Override
+  public void signalTask(String user, int taskPid, Signal signal) {
+    if (ProcessTree.isSetsidAvailable) {
+      ProcessTree.killProcessGroup(Integer.toString(taskPid), signal);
+    } else {
+      ProcessTree.killProcess(Integer.toString(taskPid), signal);      
+    }
   }
-  
+
   /**
-   * Enables the job for cleanup by changing permissions of the specified path
-   * in the local filesystem
+   * Delete the user's files under all of the task tracker root directories.
+   * @param user the user name
+   * @param subDir the path relative to the user's subdirectory under
+   *        the task tracker root directories.
+   * @throws IOException
    */
   @Override
-  void enableJobForCleanup(PathDeletionContext context)
-         throws IOException {
-    enablePathForCleanup(context);
+  public void deleteAsUser(String user, 
+                           String subDir) throws IOException {
+    String dir = TaskTracker.getUserDir(user) + Path.SEPARATOR + subDir;
+    for(Path fullDir: allocator.getAllLocalPathsToRead(dir, getConf())) {
+      fs.delete(fullDir, true);
+    }
   }
   
   /**
-   * Enables the path for cleanup by changing permissions of the specified path
-   * in the local filesystem
+   * Delete the user's files under the userlogs directory.
+   * @param user the user to work as
+   * @param subDir the path under the userlogs directory.
+   * @throws IOException
    */
-  private void enablePathForCleanup(PathDeletionContext context)
-         throws IOException {
-    try {
-      FileUtil.chmod(context.fullPath, "u+rwx", true);
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while setting permissions for " + context.fullPath +
-          " for deletion.");
-    } catch(IOException ioe) {
-      LOG.warn("Unable to change permissions of " + context.fullPath);
-    }
-  }
-
   @Override
-  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException {
-    Path localizedUniqueDir = context.getLocalizedUniqueDir();
-    try {
-      // Setting recursive execute permission on localized dir
-      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
-      FileUtil.chmod(localizedUniqueDir.toString(), "+x", true);
-    } catch (InterruptedException ie) {
-      LOG.warn("Exception in doing chmod on" + localizedUniqueDir, ie);
-      throw new IOException(ie);
-    }
+  public void deleteLogAsUser(String user, 
+                              String subDir) throws IOException {
+    Path dir = new Path(TaskLog.getUserLogDir().getAbsolutePath(), subDir);
+    fs.delete(dir, true);
   }
 
   @Override
-  public void initializeUser(InitializationContext context) {
-    // Do nothing.
+  public void setup(LocalDirAllocator allocator) {
+    this.allocator = allocator;
   }
   
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 04:43:33 2011
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -116,6 +117,13 @@ public class IsolationRunner {
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
+
+    @Override
+    public void 
+    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                       long[] sizes){
+      // NOTHING
+    }
   }
   
   private ClassLoader makeClassLoader(JobConf conf, 
@@ -176,9 +184,17 @@ public class IsolationRunner {
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    Path workDirName;
+    boolean workDirExists = lDirAlloc.ifExists(MRConstants.WORKDIR, conf);
+    if (workDirExists) {
+      workDirName = TaskRunner.formWorkDir(lDirAlloc, conf);
+    } else {
+      workDirName = lDirAlloc.getLocalPathForWrite(MRConstants.WORKDIR, 
+                                                       conf);
+    }
 
-    File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
     local.setWorkingDirectory(new Path(workDirName.toString()));
+    
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
     
     // set up a classloader with the right classpath

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:43:33 2011
@@ -3140,8 +3140,7 @@ public class JobInProgress {
         }
 
         Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-        new CleanupQueue().addToQueue(new PathDeletionContext(
-            jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
+        new CleanupQueue().addToQueue(new PathDeletionContext(tempDir, conf)); 
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
       }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java?rev=1077679&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobLocalizer.java Fri Mar  4 04:43:33 2011
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Internal class responsible for initializing the job, not intended for users.
+ * Creates the following hierarchy:
+ *   <li>$mapred.local.dir/taskTracker/$user</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/work</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jars/job.jar</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/job.xml</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/jobcache/$jobid/jobToken</li>
+ *   <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+ */
+public class JobLocalizer {
+
+  static final Log LOG = LogFactory.getLog(JobLocalizer.class);
+
+  private static final FsPermission urwx =
+    FsPermission.createImmutable((short) 0700);
+  private static final FsPermission urwx_gx =
+    FsPermission.createImmutable((short) 0710);
+  private static final FsPermission urw_gr =
+    FsPermission.createImmutable((short) 0640);
+
+  private final String user;
+  private final String jobid;
+  private final FileSystem lfs;
+  private final List<Path> localDirs;
+  private final LocalDirAllocator lDirAlloc;
+  private final JobConf ttConf;
+
+  private final String JOBDIR;
+  private final String DISTDIR;
+  private final String WORKDIR;
+  private final String JARDST;
+  private final String JOBCONF;
+  private final String JOBTOKEN;
+  private static final String JOB_LOCAL_CTXT = "mapred.job.local.dir";
+
+  public JobLocalizer(JobConf ttConf, String user, String jobid)
+      throws IOException {
+    this(ttConf, user, jobid,
+        ttConf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+  }
+
+  public JobLocalizer(JobConf ttConf, String user, String jobid,
+      String... localDirs) throws IOException {
+    if (null == user) {
+      throw new IOException("Cannot initialize for null user");
+    }
+    this.user = user;
+    if (null == jobid) {
+      throw new IOException("Cannot initialize for null jobid");
+    }
+    this.jobid = jobid;
+    this.ttConf = ttConf;
+    lfs = FileSystem.getLocal(ttConf).getRaw();
+    this.localDirs = createPaths(user, localDirs);
+    ttConf.setStrings(JOB_LOCAL_CTXT, localDirs);
+    Collections.shuffle(this.localDirs);
+    lDirAlloc = new LocalDirAllocator(JOB_LOCAL_CTXT);
+    JOBDIR = TaskTracker.JOBCACHE + Path.SEPARATOR + jobid;
+    DISTDIR = JOBDIR + "/" + TaskTracker.DISTCACHEDIR;
+    WORKDIR = JOBDIR + "/work";
+    JARDST = JOBDIR + "/" + TaskTracker.JARSDIR + "/job.jar";
+    JOBCONF = JOBDIR + "/" + TaskTracker.JOBFILE;
+    JOBTOKEN = JOBDIR + "/" + TaskTracker.JOB_TOKEN_FILE;
+  }
+
+  private static List<Path> createPaths(String user, final String[] str)
+      throws IOException {
+    if (null == str || 0 == str.length) {
+      throw new IOException("mapred.local.dir contains no entries");
+    }
+    final List<Path> ret = new ArrayList<Path>(str.length);
+    for (int i = 0; i < str.length; ++i) {
+      final Path p = new Path(str[i], TaskTracker.getUserDir(user));
+      ret.add(p);
+      str[i] = p.toString();
+    }
+    return ret;
+  }
+
+  public void createLocalDirs() throws IOException {
+    boolean userDirStatus = false;
+    // create all directories as rwx------
+    for (Path localDir : localDirs) {
+      // create $mapred.local.dir/taskTracker/$user
+      if (!lfs.mkdirs(localDir, urwx)) {
+        LOG.warn("Unable to create the user directory : " + localDir);
+        continue;
+      }
+      userDirStatus = true;
+    }
+    if (!userDirStatus) {
+      throw new IOException("Not able to initialize user directories "
+          + "in any of the configured local directories for user " + user);
+    }
+  }
+
+  /**
+   * Initialize the local directories for a particular user on this TT. This
+   * involves creation and setting permissions of the following directories
+   * <ul>
+   * <li>$mapred.local.dir/taskTracker/$user</li>
+   * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+   * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+   * </ul>
+   */
+  public void createUserDirs() throws IOException {
+    LOG.info("Initializing user " + user + " on this TT.");
+
+    boolean jobCacheDirStatus = false;
+    boolean distributedCacheDirStatus = false;
+
+    // create all directories as rwx------
+    for (Path localDir : localDirs) {
+      // create $mapred.local.dir/taskTracker/$user/jobcache
+      final Path jobDir =
+        new Path(localDir, TaskTracker.JOBCACHE);
+      if (!lfs.mkdirs(jobDir, urwx)) {
+        LOG.warn("Unable to create job cache directory : " + jobDir);
+      } else {
+        jobCacheDirStatus = true;
+      }
+      // create $mapred.local.dir/taskTracker/$user/distcache
+      final Path distDir =
+        new Path(localDir, TaskTracker.DISTCACHEDIR);
+      if (!lfs.mkdirs(distDir, urwx)) {
+        LOG.warn("Unable to create distributed-cache directory : " + distDir);
+      } else {
+        distributedCacheDirStatus = true;
+      }
+    }
+    if (!jobCacheDirStatus) {
+      throw new IOException("Not able to initialize job-cache directories "
+          + "in any of the configured local directories for user " + user);
+    }
+    if (!distributedCacheDirStatus) {
+      throw new IOException(
+          "Not able to initialize distributed-cache directories "
+              + "in any of the configured local directories for user "
+              + user);
+    }
+  }
+
+  /**
+   * Prepare the job directories for a given job. To be called by the job
+   * localization code, only if the job is not already localized.
+   * <br>
+   * Here, we set 700 permissions on the job directories created on all disks.
+   * This we do so as to avoid any misuse by other users till the time
+   * {@link TaskController#initializeJob} is run at a
+   * later time to set proper private permissions on the job directories. <br>
+   */
+  public void createJobDirs() throws IOException {
+    boolean initJobDirStatus = false;
+    for (Path localDir : localDirs) {
+      Path fullJobDir = new Path(localDir, JOBDIR);
+      if (lfs.exists(fullJobDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        lfs.delete(fullJobDir, true);
+      }
+      // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid
+      if (!lfs.mkdirs(fullJobDir, urwx)) {
+        LOG.warn("Not able to create job directory " + fullJobDir.toString());
+      } else {
+        initJobDirStatus = true;
+      }
+    }
+    if (!initJobDirStatus) {
+      throw new IOException("Not able to initialize job directories "
+          + "in any of the configured local directories for job "
+          + jobid.toString());
+    }
+  }
+
+  /**
+   * Create job log directory and set appropriate permissions for the directory.
+   */
+  public void initializeJobLogDir() throws IOException {
+    Path jobUserLogDir = new Path(TaskLog.getJobDir(jobid).toURI().toString());
+    if (!lfs.mkdirs(jobUserLogDir, urwx_gx)) {
+      throw new IOException(
+          "Could not create job user log directory: " + jobUserLogDir);
+    }
+  }
+
+  /**
+   * Download the job jar file from FS to the local file system and unjar it.
+   * Set the local jar file in the passed configuration.
+   *
+   * @param localJobConf
+   * @throws IOException
+   */
+  private void localizeJobJarFile(JobConf localJobConf) throws IOException {
+    // copy Jar file to the local FS and unjar it.
+    String jarFile = localJobConf.getJar();
+    FileStatus status = null;
+    long jarFileSize = -1;
+    if (jarFile != null) {
+      Path jarFilePath = new Path(jarFile);
+      FileSystem userFs = jarFilePath.getFileSystem(localJobConf);
+      try {
+        status = userFs.getFileStatus(jarFilePath);
+        jarFileSize = status.getLen();
+      } catch (FileNotFoundException fe) {
+        jarFileSize = -1;
+      }
+      // Here we check for five times the size of jarFileSize to accommodate for
+      // unjarring the jar file in the jars directory
+      Path localJarFile =
+        lDirAlloc.getLocalPathForWrite(JARDST, 5 * jarFileSize, ttConf);
+
+      //Download job.jar
+      userFs.copyToLocalFile(jarFilePath, localJarFile);
+      localJobConf.setJar(localJarFile.toString());
+      // Also un-jar the job.jar files. We un-jar it so that classes inside
+      // sub-directories, for e.g., lib/, classes/ are available on class-path
+      RunJar.unJar(new File(localJarFile.toString()),
+          new File(localJarFile.getParent().toString()));
+    }
+  }
+
+  /**
+   * The permissions to use for the private distributed cache objects.
+   * It is already protected by the user directory, so keep the group and other
+   * the same so that LocalFileSystem will use the java File methods to
+   * set permission.
+   */
+  private static final FsPermission privateCachePerms =
+    FsPermission.createImmutable((short) 0755);
+  
+  /**
+   * Given a list of objects, download each one.
+   * @param conf the job's configuration
+   * @param sources the list of objects to download from
+   * @param dests the list of paths to download them to
+   * @param times the desired modification times
+   * @param isPublic are the objects in the public cache?
+   * @param isArchive are these archive files?
+   * @throws IOException
+   * @return for archives, return the list of each of the sizes.
+   */
+  private static long[] downloadPrivateCacheObjects(Configuration conf,
+                                             URI[] sources,
+                                             Path[] dests,
+                                             long[] times,
+                                             boolean[] isPublic,
+                                             boolean isArchive
+                                             ) throws IOException {
+    if (null == sources && null == dests && null == times && null == isPublic) {
+      return new long[0];
+    }
+    if (sources.length != dests.length ||
+        sources.length != times.length ||
+        sources.length != isPublic.length) {
+      throw new IOException("Distributed cache entry arrays have different " +
+                            "lengths: " + sources.length + ", " + dests.length +
+                            ", " + times.length + ", " + isPublic.length);
+    }
+    long[] result = new long[sources.length];
+    for(int i=0; i < sources.length; i++) {
+      // public objects are already downloaded by the Task Tracker, we
+      // only need to handle the private ones here
+      if (!isPublic[i]) {
+        result[i] = 
+          TrackerDistributedCacheManager.downloadCacheObject(conf, sources[i], 
+                                                             dests[i], 
+                                                             times[i], 
+                                                             isArchive, 
+                                                             privateCachePerms);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Download the parts of the distributed cache that are private.
+   * @param conf the job's configuration
+   * @throws IOException
+   * @return the size of the archive objects
+   */
+  public static long[] downloadPrivateCache(Configuration conf) throws IOException {
+    downloadPrivateCacheObjects(conf,
+                                DistributedCache.getCacheFiles(conf),
+                                DistributedCache.getLocalCacheFiles(conf),
+                                DistributedCache.getFileTimestamps(conf),
+                                TrackerDistributedCacheManager.
+                                  getFileVisibilities(conf),
+                                false);
+    return 
+      downloadPrivateCacheObjects(conf,
+                                  DistributedCache.getCacheArchives(conf),
+                                  DistributedCache.getLocalCacheArchives(conf),
+                                  DistributedCache.getArchiveTimestamps(conf),
+                                  TrackerDistributedCacheManager.
+                                    getArchiveVisibilities(conf),
+                                  true);
+  }
+
+  public void localizeJobFiles(JobID jobid, JobConf jConf,
+      Path localJobTokenFile, TaskUmbilicalProtocol taskTracker)
+      throws IOException {
+    localizeJobFiles(jobid, jConf,
+        lDirAlloc.getLocalPathForWrite(JOBCONF, ttConf), localJobTokenFile,
+        taskTracker);
+  }
+
+  public void localizeJobFiles(JobID jobid, JobConf jConf,
+      Path localJobFile, Path localJobTokenFile,
+      TaskUmbilicalProtocol taskTracker) throws IOException {
+    // Download the job.jar for this job from the system FS
+    localizeJobJarFile(jConf);
+
+    //setup the distributed cache
+    jConf.set(JOB_LOCAL_CTXT, ttConf.get(JOB_LOCAL_CTXT));
+    downloadPrivateCache(jConf);
+
+    //update the config some more
+    jConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile.toString());
+    jConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, 
+        ttConf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+    TaskTracker.resetNumTasksPerJvm(jConf);
+
+    long[] sizes = downloadPrivateCache(jConf);
+    taskTracker.updatePrivateDistributedCacheSizes(jobid, sizes);
+
+    // Create job-acls.xml file in job userlog dir and write the needed
+    // info for authorization of users for viewing task logs of this job.
+    writeJobACLs(jConf, new Path(TaskLog.getJobDir(jobid).toURI().toString()));
+
+    //write the updated jobConf file in the job directory
+    JobLocalizer.writeLocalJobFile(localJobFile, jConf);
+  }
+
+  /**
+   *  Creates job-acls.xml under the given directory logDir and writes
+   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+   *  file.
+   *  queue name is the queue to which the job was submitted to.
+   *  queue-admins-acl is the queue admins ACL of the queue to which this
+   *  job was submitted to.
+   * @param conf   job configuration
+   * @param logDir job userlog dir
+   * @throws IOException
+   */
+  private void writeJobACLs(JobConf conf, Path logDir) throws IOException {
+    JobConf aclConf = new JobConf(false);
+
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
+    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set the job queue name in aclConf
+    String queue = conf.getQueueName();
+    aclConf.setQueueName(queue);
+
+    // set the queue admins acl in aclConf
+    String qACLName = QueueManager.toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    String queueAdminsACL = conf.get(qACLName, " ");
+    aclConf.set(qACLName, queueAdminsACL);
+
+    // set jobOwner as user.name in aclConf
+    aclConf.set("user.name", user);
+
+    OutputStream out = null;
+    Path aclFile = new Path(logDir, TaskTracker.jobACLsFile);
+    try {
+      out = lfs.create(aclFile);
+      aclConf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+    lfs.setPermission(aclFile, urw_gr);
+  }
+
+  public void createWorkDir(JobConf jConf) throws IOException {
+    // create $mapred.local.dir/taskTracker/$user/jobcache/$jobid/work
+    final Path workDir = lDirAlloc.getLocalPathForWrite(WORKDIR, ttConf);
+    if (!lfs.mkdirs(workDir)) {
+      throw new IOException("Mkdirs failed to create "
+          + workDir.toString());
+    }
+    jConf.set(TaskTracker.JOB_LOCAL_DIR, workDir.toUri().getPath());
+  }
+
+  public Path findCredentials() throws IOException {
+    return lDirAlloc.getLocalPathToRead(JOBTOKEN, ttConf);
+  }
+
+  public int runSetup(String user, String jobid, Path localJobTokenFile,
+                      TaskUmbilicalProtocol taskTracker) throws IOException {
+    // load user credentials, configuration
+    // ASSUME
+    // let $x = $mapred.local.dir
+    // forall $x, exists $x/$user
+    // exists $x/$user/jobcache/$jobid/job.xml
+    // exists $x/$user/jobcache/$jobid/jobToken
+    // exists $logdir/userlogs/$jobid
+    final Path localJobFile = lDirAlloc.getLocalPathToRead(JOBCONF, ttConf);
+    final JobConf cfgJob = new JobConf(localJobFile);
+    createWorkDir(ttConf);
+    localizeJobFiles(JobID.forName(jobid), cfgJob, localJobFile,
+        localJobTokenFile, taskTracker);
+
+    // $mapred.local.dir/taskTracker/$user/distcache
+    return 0;
+  }
+
+  public static void main(String[] argv)
+      throws IOException, InterruptedException {
+    // $logdir
+    // let $x = $root/tasktracker for some $mapred.local.dir
+    //   create $x/$user/jobcache/$jobid/work
+    //   fetch  $x/$user/jobcache/$jobid/jars/job.jar
+    //   setup  $x/$user/distcache
+    //   verify $x/distcache
+    //   write  $x/$user/jobcache/$jobid/job.xml
+    final String user = argv[0];
+    final String jobid = argv[1];
+    final InetSocketAddress ttAddr = 
+      new InetSocketAddress(argv[2], Integer.parseInt(argv[3]));
+    final String uid = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (!user.equals(uid)) {
+      LOG.warn("Localization running as " + uid + " not " + user);
+    }
+
+    // Pull in user's tokens to complete setup
+    final JobConf conf = new JobConf();
+    final JobLocalizer localizer =
+      new JobLocalizer(conf, user, jobid);
+    final Path jobTokenFile = localizer.findCredentials();
+    final Credentials creds = TokenCache.loadTokens(
+        jobTokenFile.toUri().toString(), conf);
+    LOG.debug("Loaded tokens from " + jobTokenFile);
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
+      ugi.addToken(token);
+    }
+    final TaskUmbilicalProtocol taskTracker =
+      (TaskUmbilicalProtocol) RPC.getProxy(TaskUmbilicalProtocol.class,
+                                           TaskUmbilicalProtocol.versionID,
+                                           ttAddr, conf);
+    System.exit(
+      ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+        public Integer run() {
+          try {
+            return localizer.runSetup(user, jobid, jobTokenFile, taskTracker);
+          } catch (Throwable e) {
+            e.printStackTrace(System.out);
+            return -1;
+          }
+        }
+      }));
+  }
+
+  /**
+   * Write the task specific job-configuration file.
+   * @throws IOException
+   */
+  public static void writeLocalJobFile(Path jobFile, JobConf conf)
+      throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(jobFile);
+    OutputStream out = null;
+    try {
+      out = FileSystem.create(localFs, jobFile, urw_gr);
+      conf.writeXml(out);
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Mar  4 04:43:33 2011
@@ -30,12 +30,12 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.JVMInfo;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.JvmFinishedEvent;
 import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 class JvmManager {
@@ -49,8 +49,8 @@ class JvmManager {
   
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
-      Map<String,String> env, JobConf conf) {
-    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
+      JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,conf);
   }
   
   public JvmManager(TaskTracker tracker) {
@@ -103,7 +103,7 @@ class JvmManager {
   }
   
   
-  public void stop() {
+  public void stop() throws IOException, InterruptedException {
     mapJvmManager.stop();
     reduceJvmManager.stop();
   }
@@ -116,7 +116,8 @@ class JvmManager {
     }
   }
 
-  public void launchJvm(TaskRunner t, JvmEnv env) {
+  public void launchJvm(TaskRunner t, JvmEnv env
+                        ) throws IOException, InterruptedException {
     if (t.getTask().isMapTask()) {
       mapJvmManager.reapJvm(t, env);
     } else {
@@ -140,7 +141,8 @@ class JvmManager {
     }
   }
 
-  public void taskKilled(TaskRunner tr) {
+  public void taskKilled(TaskRunner tr
+                         ) throws IOException, InterruptedException {
     if (tr.getTask().isMapTask()) {
       mapJvmManager.taskKilled(tr);
     } else {
@@ -148,7 +150,7 @@ class JvmManager {
     }
   }
 
-  public void killJvm(JVMId jvmId) {
+  public void killJvm(JVMId jvmId) throws IOException, InterruptedException {
     if (jvmId.isMap) {
       mapJvmManager.killJvm(jvmId);
     } else {
@@ -161,15 +163,19 @@ class JvmManager {
    * asynchronous deletion of work dir.
    * @param tracker taskTracker
    * @param task    the task whose work dir needs to be deleted
-   * @throws IOException
    */
-  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+  static void deleteWorkDir(TaskTracker tracker, Task task) {
+    String user = task.getUser();
+    String jobid = task.getJobID().toString();
+    String taskid = task.getTaskID().toString();
+    String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid, 
+                                                task.isTaskCleanupTask());
+    String userDir = TaskTracker.getUserDir(user);
     tracker.getCleanupThread().addToQueue(
-        TaskTracker.buildTaskControllerTaskPathDeletionContexts(
-          tracker.getLocalFileSystem(),
-          tracker.getLocalFiles(tracker.getJobConf(), ""),
-          task, true /* workDir */,
-          tracker.getTaskController()));
+     new TaskController.DeletionContext(tracker.getTaskController(), false,
+                                        user, 
+                                        workDir.substring(userDir.length())));
+                                           
   }
 
   static class JvmManagerForType {
@@ -188,8 +194,13 @@ class JvmManager {
     
     int maxJvms;
     boolean isMap;
+    private final long sleeptimeBeforeSigkill;
     
     Random rand = new Random(System.currentTimeMillis());
+    private static final String DELAY_BEFORE_KILL_KEY =
+      "mapred.tasktracker.tasks.sleeptime-before-sigkill";
+    // number of milliseconds to wait between TERM and KILL.
+    private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250;
     private TaskTracker tracker;
 
     public JvmManagerForType(int maxJvms, boolean isMap, 
@@ -197,6 +208,9 @@ class JvmManager {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
       this.tracker = tracker;
+      sleeptimeBeforeSigkill =
+        tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY,
+                                     DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -216,25 +230,6 @@ class JvmManager {
         JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
         Task task = taskRunner.getTaskInProgress().getTask();
 
-        // Initialize task dirs
-        TaskControllerContext context =
-            new TaskController.TaskControllerContext();
-        context.env = jvmRunner.env;
-        context.task = task;
-        // If we are returning the same task as which the JVM was launched
-        // we don't initialize task once again.
-        if (!jvmRunner.env.conf.get("mapred.task.id").equals(
-            task.getTaskID().toString())) {
-          try {
-            tracker.getTaskController().initializeTask(context);
-          } catch (IOException e) {
-            LOG.warn("Failed to initialize the new task "
-                + task.getTaskID().toString() + " to be given to JVM with id "
-                + jvmId);
-            throw e;
-          }
-        }
-
         jvmRunner.taskGiven(task);
         return taskRunner.getTaskInProgress();
 
@@ -257,7 +252,9 @@ class JvmManager {
       }
     }
 
-    synchronized public void taskKilled(TaskRunner tr) {
+    synchronized public void taskKilled(TaskRunner tr
+                                        ) throws IOException,
+                                                 InterruptedException {
       JVMId jvmId = runningTaskToJvm.remove(tr);
       if (jvmId != null) {
         jvmToRunningTask.remove(jvmId);
@@ -265,14 +262,15 @@ class JvmManager {
       }
     }
 
-    synchronized public void killJvm(JVMId jvmId) {
+    synchronized public void killJvm(JVMId jvmId) throws IOException, 
+                                                         InterruptedException {
       JvmRunner jvmRunner;
       if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
         killJvmRunner(jvmRunner);
       }
     }
     
-    synchronized public void stop() {
+    synchronized public void stop() throws IOException, InterruptedException {
       //since the kill() method invoked later on would remove
       //an entry from the jvmIdToRunner map, we create a
       //copy of the values and iterate over it (if we don't
@@ -285,7 +283,9 @@ class JvmManager {
       }
     }
 
-    private synchronized void killJvmRunner(JvmRunner jvmRunner) {
+    private synchronized void killJvmRunner(JvmRunner jvmRunner
+                                            ) throws IOException,
+                                                     InterruptedException {
       jvmRunner.kill();
       removeJvm(jvmRunner.jvmId);
     }
@@ -295,7 +295,7 @@ class JvmManager {
       jvmIdToPid.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, JvmEnv env) {
+        TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -384,7 +384,7 @@ class JvmManager {
 
     private void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
-      JvmRunner jvmRunner = new JvmRunner(env,jobId);
+      JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
       //spawn the JVM in a new thread. Note that there will be very little
       //extra overhead of launching the new thread for a new JVM since
@@ -420,8 +420,7 @@ class JvmManager {
       JVMId jvmId;
       volatile boolean busy = true;
       private ShellCommandExecutor shexec; // shell terminal for running the task
-      //context used for starting JVM
-      private TaskControllerContext initalContext;
+      private Task firstTask;
 
       private List<Task> tasksGiven = new ArrayList<Task>();
 
@@ -429,65 +428,99 @@ class JvmManager {
         tasksGiven.add(task);
       }
 
-      public JvmRunner(JvmEnv env, JobID jobId) {
+      public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
         this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+        this.firstTask = firstTask;
         LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
       }
+
+      @Override
       public void run() {
-        runChild(env);
-        jvmFinished();
+        try {
+          runChild(env);
+        } catch (InterruptedException ie) {
+          return;
+        } catch (IOException e) {
+          LOG.warn("Caught IOException in JVMRunner", e);
+        } catch (Throwable e) {
+          LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e);
+          System.exit(1);
+        } finally {
+          jvmFinished();
+        }
       }
 
-      public void runChild(JvmEnv env) {
-        initalContext = new TaskControllerContext();
+      public void runChild(JvmEnv env) throws IOException, InterruptedException{
+        int exitCode = 0;
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
           //Launch the task controller to run task JVM
-          initalContext.task = jvmToRunningTask.get(jvmId).getTask();
-          initalContext.env = env;
-          tracker.getTaskController().launchTaskJVM(initalContext);
+          String user = jvmToRunningTask.get(jvmId).getTask().getUser();
+          TaskAttemptID taskAttemptId = 
+            jvmToRunningTask.get(jvmId).getTask().getTaskID();
+          String taskAttemptIdStr = 
+            jvmToRunningTask.get(jvmId).getTask().isTaskCleanupTask() ? 
+                (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :
+                  taskAttemptId.toString(); 
+          exitCode = tracker.getTaskController().launchTask(user,
+              jvmId.jobId.toString(), taskAttemptIdStr, env.setup,
+              env.vargs, env.workDir, env.stdout.toString(),
+              env.stderr.toString());
         } catch (IOException ioe) {
           // do nothing
           // error and output are appropriately redirected
         } finally { // handle the exit code
-          shexec = initalContext.shExec;
-          if (shexec == null) {
-            return;
-          }
-
+          // although the process has exited before we get here,
+          // make sure the entire process group has also been killed.
           kill();
-
-          int exitCode = shexec.getExitCode();
           updateOnJvmExit(jvmId, exitCode);
           LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
               + ". Number of tasks it ran: " + numTasksRan);
+          deleteWorkDir(tracker, firstTask);
+        }
+      }
+
+      private class DelayedProcessKiller extends Thread {
+        private final String user;
+        private final int pid;
+        private final long delay;
+        private final Signal signal;
+        DelayedProcessKiller(String user, int pid, long delay, Signal signal) {
+          this.user = user;
+          this.pid = pid;
+          this.delay = delay;
+          this.signal = signal;
+          setName("Task killer for " + pid);
+          setDaemon(false);
+        }
+        @Override
+        public void run() {
           try {
-            // In case of jvm-reuse,
-            //the task jvm cleans up the common workdir for every 
-            //task at the beginning of each task in the task JVM.
-            //For the last task, we do it here.
-            if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              deleteWorkDir(tracker, initalContext.task);
-            }
-          } catch (IOException ie){}
+            Thread.sleep(delay);
+            tracker.getTaskController().signalTask(user, pid, signal);
+          } catch (InterruptedException e) {
+            return;
+          } catch (IOException e) {
+            LOG.warn("Exception when killing task " + pid, e);
+          }
         }
       }
 
-      synchronized void kill() {
+      synchronized void kill() throws IOException, InterruptedException {
         if (!killed) {
           TaskController controller = tracker.getTaskController();
           // Check inital context before issuing a kill to prevent situations
           // where kill is issued before task is launched.
-          if (initalContext != null && initalContext.env != null) {
-            initalContext.pid = jvmIdToPid.get(jvmId);
-            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
-              .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
-                  ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-            // Destroy the task jvm
-            controller.destroyTaskJVM(initalContext);
+          String pidStr = jvmIdToPid.get(jvmId);
+          if (pidStr != null) {
+            String user = env.conf.getUser();
+            int pid = Integer.parseInt(pidStr);
+            // start a thread that will kill the process dead
+            //new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill, 
+            //                         Signal.KILL).start();
+            controller.signalTask(user, pid, Signal.KILL);
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId
                 .toString()));
@@ -498,9 +531,9 @@ class JvmManager {
 
       // Post-JVM-exit logs processing. inform user log manager
       private void jvmFinished() {
-        Task firstTask = initalContext.task;
         JvmFinishedEvent jfe = new JvmFinishedEvent(new JVMInfo(
-            TaskLog.getAttemptDir(firstTask.getTaskID(), firstTask.isTaskCleanupTask()),
+            TaskLog.getAttemptDir(firstTask.getTaskID(), 
+                                  firstTask.isTaskCleanupTask()),
             tasksGiven));
         tracker.getUserLogManager().addLogEvent(jfe);
       }
@@ -531,15 +564,13 @@ class JvmManager {
     JobConf conf;
     Map<String, String> env;
 
-    public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
-        File stderr, long logSize, File workDir, Map<String,String> env,
-        JobConf conf) {
+    public JvmEnv(List <String> setup, Vector<String> vargs, File stdout, 
+        File stderr, long logSize, File workDir, JobConf conf) {
       this.setup = setup;
       this.vargs = vargs;
       this.stdout = stdout;
       this.stderr = stderr;
       this.workDir = workDir;
-      this.env = env;
       this.conf = conf;
     }
   }



Mime
View raw message