hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346214 [6/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea...
Date Tue, 05 Jun 2012 02:33:47 GMT
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Jun  5 02:33:44 2012
@@ -20,7 +20,11 @@ package org.apache.hadoop.mapreduce.file
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.text.DateFormat;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -31,13 +35,9 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -46,10 +46,16 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
-import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Manages a single machine's instance of a cross-job
@@ -63,6 +69,11 @@ public class TrackerDistributedCacheMana
   // cacheID to cacheStatus mapping
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
+  private Map<JobID, TaskDistributedCacheManager> jobArchives =
+    Collections.synchronizedMap(
+        new HashMap<JobID, TaskDistributedCacheManager>());
+  private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
+    FsPermission.createImmutable((short) 0755);
 
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
@@ -76,24 +87,20 @@ public class TrackerDistributedCacheMana
   private final LocalFileSystem localFs;
   
   private LocalDirAllocator lDirAllocator;
-  
-  private TaskController taskController;
-  
+
   private Configuration trackerConf;
   
-  private Random random = new Random();
+  private static final Random random = new Random();
 
   private MRAsyncDiskService asyncDiskService;
 
   BaseDirManager baseDirManager = new BaseDirManager();
   CleanupThread cleanupThread;
 
-  public TrackerDistributedCacheManager(Configuration conf,
-      TaskController taskController) throws IOException {
+  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
-    this.taskController = taskController;
       // setting the cache size to a default of 10GB
     this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
           DEFAULT_CACHE_SIZE);
@@ -111,7 +118,7 @@ public class TrackerDistributedCacheMana
   public TrackerDistributedCacheManager(Configuration conf,
       TaskController taskController, MRAsyncDiskService asyncDiskService)
       throws IOException {
-    this(conf, taskController);
+    this(conf);
     this.asyncDiskService = asyncDiskService;
   }
 
@@ -145,15 +152,15 @@ public class TrackerDistributedCacheMana
    * archives, the path to the file where the file is copied locally
    * @throws IOException
    */
-  Path getLocalCache(URI cache, Configuration conf,
-      String subDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
-      throws IOException {
-    String key;
-    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+  Path getLocalCache(URI cache, Configuration conf, String subDir,
+      FileStatus fileStatus, boolean isArchive, long confFileStamp,
+      boolean isPublic, CacheFile file)
+      throws IOException, InterruptedException {
+    String user = getLocalizedCacheOwner(isPublic);
+    String key = getKey(cache, conf, confFileStamp, user);
     CacheStatus lcacheStatus;
     Path localizedPath = null;
+    Path localPath = null;
     synchronized (cachedArchives) {
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -161,44 +168,59 @@ public class TrackerDistributedCacheMana
         String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
           new Path(uniqueString, makeRelative(cache, conf))).toString();
-        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
-          fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
-          cachePath, "")), localPath, new Path(subDir), uniqueString);
+        localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), trackerConf, isPublic);
+        lcacheStatus = 
+          new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
+                          localPath, new Path(subDir), uniqueString, 
+                          isPublic ? null : user);
         cachedArchives.put(key, lcacheStatus);
       }
 
-      //mark the cache for use. 
-      lcacheStatus.refcount++;
+      //mark the cache for use.
+      file.setStatus(lcacheStatus);
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount++;
+      }
     }
     
-    boolean initSuccessful = false;
     try {
       // do the localization, after releasing the global lock
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
-          FileSystem fs = FileSystem.get(cache, conf);
-          checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
-              lcacheStatus, fileStatus);
-          localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, isArchive, isPublic);
+          if (isPublic) {
+            // TODO verify covered
+            //checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
+            //    lcacheStatus, fileStatus);
+            localizedPath = localizePublicCacheObject(conf, cache,
+                confFileStamp, lcacheStatus, fileStatus, isArchive);
+          } else {
+            localizedPath = localPath;
+            if (!isArchive) {
+              //for private archives, the lengths come over RPC from the 
+              //JobLocalizer since the JobLocalizer is the one who expands
+              //archives and gets the total length
+              lcacheStatus.size = fileStatus.getLen();
+
+              // Increase the size and sub directory count of the cache
+              // from baseDirSize and baseDirNumberSubDir.
+              baseDirManager.addCacheUpdate(lcacheStatus);
+            }
+          }
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
               lcacheStatus, fileStatus, isArchive);
         }
-        createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
-            honorSymLinkConf);
       }
-      initSuccessful = true;
-      return localizedPath;
-    } finally {
-      if (!initSuccessful) {
-        synchronized (cachedArchives) {
-          lcacheStatus.refcount--;
-        }
+    } catch (IOException ie) {
+      synchronized (lcacheStatus) {
+        // release this cache
+        lcacheStatus.refcount -= 1;
+        throw ie;
       }
     }
+    return localizedPath;
   }
 
   /**
@@ -211,37 +233,30 @@ public class TrackerDistributedCacheMana
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        LOG.warn("Cannot find localized cache: " + cache + 
-                 " (key: " + key + ") in releaseCache!");
-        return;
+  void releaseCache(CacheStatus status) throws IOException {
+    synchronized (status) {
+      status.refcount--;
+    }
+  }
+
+  void setSize(CacheStatus status, long size) throws IOException {
+    if (size != 0) {
+      synchronized (status) {
+        status.size = size;
+        baseDirManager.addCacheUpdate(status);
       }
-      
-      // decrement ref count 
-      lcacheStatus.refcount--;
     }
   }
 
   /*
    * This method is called from unit tests. 
    */
-  int getReferenceCount(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        throw new IOException("Cannot find localized cache: " + cache);
-      }
-      return lcacheStatus.refcount;
+  int getReferenceCount(CacheStatus status) throws IOException {
+    synchronized (status) {
+      return status.refcount;
     }
   }
-  
+
   /**
    * Get the user who should "own" the localized distributed cache file.
    * If the cache is public, the tasktracker user is the owner. If private,
@@ -266,6 +281,7 @@ public class TrackerDistributedCacheMana
    */
   private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
       LocalFileSystem fs, Path path) throws IOException {
+    // TODO need to make asyncDiskService use taskController
     boolean deleted = false;
     if (asyncDiskService != null) {
       // Try to delete using asyncDiskService
@@ -419,53 +435,72 @@ public class TrackerDistributedCacheMana
     return cacheStatus.localizedLoadPath;
   }
   
-  private void createSymlink(Configuration conf, URI cache,
-      CacheStatus cacheStatus, boolean isArchive,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
-    if(cache.getFragment() == null) {
-      doSymlink = false;
-    }
-    String link = 
-      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (doSymlink){
-      if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
-      }
-    }
+  private static Path createRandomPath(Path base) throws IOException {
+    return new Path(base.toString() + "-work-" + random.nextLong());
   }
-  
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  Path localizeCache(Configuration conf,
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    boolean isArchive, boolean isPublic)
-  throws IOException {
-    FileSystem fs = FileSystem.get(cache, conf);
+
+  /**
+   * Download a given path to the local file system.
+   * @param conf the job's configuration
+   * @param source the source to copy from
+   * @param destination where to copy the file. must be local fs
+   * @param desiredTimestamp the required modification timestamp of the source
+   * @param isArchive is this an archive that should be expanded
+   * @param permission the desired permissions of the file.
+   * @return for archives, the number of bytes in the unpacked directory
+   * @throws IOException
+   */
+  public static long downloadCacheObject(Configuration conf,
+                                         URI source,
+                                         Path destination,
+                                         long desiredTimestamp,
+                                         boolean isArchive,
+                                         FsPermission permission
+                                         ) throws IOException,
+                                                  InterruptedException {
+    FileSystem sourceFs = FileSystem.get(source, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
+    
+    Path sourcePath = new Path(source.getPath());
+    long modifiedTime = 
+      sourceFs.getFileStatus(sourcePath).getModificationTime();
+    if (modifiedTime != desiredTimestamp) {
+      DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, 
+                                                     DateFormat.SHORT);
+      throw new IOException("The distributed cache object " + source + 
+                            " changed during the job from " + 
+                            df.format(new Date(desiredTimestamp)) + " to " +
+                            df.format(new Date(modifiedTime)));
+    }
+    
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localizedLoadPath,
-        new Path(cacheStatus.localizedLoadPath.getName()));
+      parchive = new Path(destination, destination.getName());
     } else {
-      parchive = cacheStatus.localizedLoadPath;
+      parchive = destination;
     }
-
-    if (!localFs.mkdirs(parchive.getParent())) {
-      throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localizedLoadPath.toString());
-    }
-
-    String cacheId = cache.getPath();
-    fs.copyToLocalFile(new Path(cacheId), parchive);
+    // if the file already exists, we are done
+    if (localFs.exists(parchive)) {
+      return 0;
+    }
+    // the final directory for the object
+    Path finalDir = parchive.getParent();
+    // the work directory for the object
+    Path workDir = createRandomPath(finalDir);
+    LOG.info("Creating " + destination.getName() + " in " + workDir + " with " + 
+            permission);
+    if (!localFs.mkdirs(workDir, permission)) {
+      throw new IOException("Mkdirs failed to create directory " + workDir);
+    }
+    Path workFile = new Path(workDir, parchive.getName());
+    sourceFs.copyToLocalFile(sourcePath, workFile);
+    localFs.setPermission(workFile, permission);
     if (isArchive) {
-      String tmpArchive = parchive.toString().toLowerCase();
-      File srcFile = new File(parchive.toString());
-      File destDir = new File(parchive.getParent().toString());
+      String tmpArchive = workFile.getName().toLowerCase();
+      File srcFile = new File(workFile.toString());
+      File destDir = new File(workDir.toString());
       LOG.info(String.format("Extracting %s to %s",
-          srcFile.toString(), destDir.toString()));
+               srcFile.toString(), destDir.toString()));
       if (tmpArchive.endsWith(".jar")) {
         RunJar.unJar(srcFile, destDir);
       } else if (tmpArchive.endsWith(".zip")) {
@@ -479,47 +514,48 @@ public class TrackerDistributedCacheMana
         // else will not do anyhting
         // and copy the file into the dir as it is
       }
+      FileUtil.chmod(destDir.toString(), "ugo+rx", true);
+    }
+    // promote the output to the final location
+    if (!localFs.rename(workDir, finalDir)) {
+      localFs.delete(workDir, true);
+      if (!localFs.exists(finalDir)) {
+        throw new IOException("Failed to promote distributed cache object " +
+                              workDir + " to " + finalDir);
+      }
+      // someone else promoted first
+      return 0;
     }
 
+    LOG.info(String.format("Cached %s as %s",
+             source.toString(), destination.toString()));
     long cacheSize = 
       FileUtil.getDU(new File(parchive.getParent().toString()));
-    cacheStatus.size = cacheSize;
+    return cacheSize;
+  }
+
+  //the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  Path localizePublicCacheObject(Configuration conf,
+                                 URI cache, long confFileStamp,
+                                 CacheStatus cacheStatus,
+                                 FileStatus fileStatus,
+                                 boolean isArchive
+                                 ) throws IOException, InterruptedException {
+    long size = downloadCacheObject(conf, cache, cacheStatus.localizedLoadPath,
+                                    confFileStamp, isArchive, 
+                                    PUBLIC_CACHE_OBJECT_PERM);
+    cacheStatus.size = size;
+
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
     baseDirManager.addCacheUpdate(cacheStatus);
 
-    // set proper permissions for the localized directory
-    setPermissions(conf, cacheStatus, isPublic);
-
-    // update cacheStatus to reflect the newly cached file
-    cacheStatus.mtime = getTimestamp(conf, cache);
-
     LOG.info(String.format("Cached %s as %s",
              cache.toString(), cacheStatus.localizedLoadPath));
     return cacheStatus.localizedLoadPath;
   }
 
-  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
-      boolean isPublic) throws IOException {
-    if (isPublic) {
-      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
-      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
-      try {
-        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
-      } catch (InterruptedException e) {
-        LOG.warn("Exception in chmod" + e.toString());
-        throw new IOException(e);
-      }
-    } else {
-      // invoke taskcontroller to set permissions
-      DistributedCacheFileContext context = new DistributedCacheFileContext(
-          conf.get(MRJobConfig.USER_NAME), new File(cacheStatus.localizedBaseDir
-              .toString()), cacheStatus.localizedBaseDir,
-          cacheStatus.uniqueString);
-      taskController.initializeDistributedCacheFile(context);
-    }
-  }
-
   private static boolean isTarFile(String filename) {
     return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
            filename.endsWith(".tar"));
@@ -553,12 +589,18 @@ public class TrackerDistributedCacheMana
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
   throws IOException {
-    long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache,
-        confFileStamp, lcacheStatus, fileStatus);
-    if (dfsFileStamp != lcacheStatus.mtime) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
+    } else {
+      dfsFileStamp = getTimestamp(conf, cache);
     }
 
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache +
+                            " has changed on HDFS since job started");
+    }
     return true;
   }
 
@@ -607,7 +649,6 @@ public class TrackerDistributedCacheMana
     // individual cacheStatus lock.
     //
     long size;              //the size of this cache.
-    long mtime;             // the cache-file modification time
     boolean inited = false; // is it initialized ?
 
     //
@@ -618,19 +659,21 @@ public class TrackerDistributedCacheMana
     final Path subDir;
     // unique string used in the construction of local load path
     final String uniqueString;
+    // The user that owns the cache entry or null if it is public
+    final String user;
     // the local load path of this cache
     final Path localizedLoadPath;
     //the base dir where the cache lies
     final Path localizedBaseDir;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
-        String uniqueString) {
+        String uniqueString, String user) {
       super();
       this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
-      this.mtime = -1;
       this.localizedBaseDir = baseDir;
       this.size = 0;
+      this.user = user;
       this.subDir = subDir;
       this.uniqueString = uniqueString;
     }
@@ -673,8 +716,22 @@ public class TrackerDistributedCacheMana
   }
 
   public TaskDistributedCacheManager newTaskDistributedCacheManager(
-      Configuration taskConf) throws IOException {
-    return new TaskDistributedCacheManager(this, taskConf);
+      JobID jobId, Configuration taskConf) throws IOException {
+    TaskDistributedCacheManager result =
+      new TaskDistributedCacheManager(this, taskConf);
+    jobArchives.put(jobId, result);
+    return result;
+  }
+ 
+  public void deleteTaskDistributedCacheManager(JobID jobId) {
+    jobArchives.remove(jobId);
+  }
+  
+  public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
+    TaskDistributedCacheManager mgr = jobArchives.get(jobId);
+    if (mgr != null) {
+      mgr.setSizes(sizes);
+    }
   }
 
   /**
@@ -787,6 +844,17 @@ public class TrackerDistributedCacheMana
     }
   }
   
+  private static boolean[] parseBooleans(String[] strs) {
+    if (null == strs) {
+      return null;
+    }
+    boolean[] result = new boolean[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Boolean.parseBoolean(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get the booleans on whether the files are public or not.  Used by 
    * internal DistributedCache and MapReduce code.
@@ -794,8 +862,8 @@ public class TrackerDistributedCacheMana
    * @return a string array of booleans 
    * @throws IOException
    */
-  static String[] getFileVisibilities(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+  public static boolean[] getFileVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES));
   }
 
   /**
@@ -804,8 +872,8 @@ public class TrackerDistributedCacheMana
    * @param conf The configuration which stored the timestamps
    * @return a string array of booleans 
    */
-  static String[] getArchiveVisibilities(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+  public static boolean[] getArchiveVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES));
   }
 
   /**

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Tue Jun  5 02:33:44 2012
@@ -131,7 +131,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
-  public String[] getArchiveTimestamps() {
+  public long[] getArchiveTimestamps() {
     return base.getArchiveTimestamps();
   }
 
@@ -162,7 +162,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
-  public String[] getFileTimestamps() {
+  public long[] getFileTimestamps() {
     return base.getFileTimestamps();
   }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Tue Jun  5 02:33:44 2012
@@ -124,7 +124,7 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
-  public String[] getArchiveTimestamps() {
+  public long[] getArchiveTimestamps() {
     return base.getArchiveTimestamps();
   }
 
@@ -155,7 +155,7 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
-  public String[] getFileTimestamps() {
+  public long[] getFileTimestamps() {
     return base.getFileTimestamps();
   }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Tue Jun  5 02:33:44 2012
@@ -133,7 +133,7 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
-    public String[] getArchiveTimestamps() {
+    public long[] getArchiveTimestamps() {
       return mapContext.getArchiveTimestamps();
     }
 
@@ -164,7 +164,7 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
-    public String[] getFileTimestamps() {
+    public long[] getFileTimestamps() {
       return mapContext.getFileTimestamps();
     }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Tue Jun  5 02:33:44 2012
@@ -126,7 +126,7 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
-    public String[] getArchiveTimestamps() {
+    public long[] getArchiveTimestamps() {
       return reduceContext.getArchiveTimestamps();
     }
 
@@ -157,7 +157,7 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
-    public String[] getFileTimestamps() {
+    public long[] getFileTimestamps() {
       return reduceContext.getFileTimestamps();
     }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Jun  5 02:33:44 2012
@@ -176,7 +176,7 @@ public class TokenCache {
    * @throws IOException
    */
   @InterfaceAudience.Private
-  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Jun  5 02:33:44 2012
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.mapreduce.server.tasktracker;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,13 +27,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.JobID;
 
 @InterfaceAudience.Private
@@ -45,19 +42,16 @@ public class Localizer {
 
   private FileSystem fs;
   private String[] localDirs;
-  private TaskController taskController;
 
   /**
    * Create a Localizer instance
    * 
    * @param fileSys
    * @param lDirs
-   * @param tc
    */
-  public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+  public Localizer(FileSystem fileSys, String[] lDirs) {
     fs = fileSys;
     localDirs = lDirs;
-    taskController = tc;
   }
 
   // Data-structure for synchronizing localization of user directories.
@@ -162,13 +156,6 @@ public class Localizer {
                 + user);
       }
 
-      // Now, run the task-controller specific code to initialize the
-      // user-directories.
-      InitializationContext context = new InitializationContext();
-      context.user = user;
-      context.workDir = null;
-      taskController.initializeUser(context);
-
       // Localization of the user is done
       localizedUser.set(true);
     }
@@ -181,7 +168,7 @@ public class Localizer {
    * <br>
    * Here, we set 700 permissions on the job directories created on all disks.
    * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * {@link TaskController#initializeJob} is run at a
    * later time to set proper private permissions on the job directories. <br>
    * 
    * @param user
@@ -228,16 +215,15 @@ public class Localizer {
    * @param user
    * @param jobId
    * @param attemptId
-   * @param isCleanupAttempt
    * @throws IOException
    */
   public void initializeAttemptDirs(String user, String jobId,
-      String attemptId, boolean isCleanupAttempt)
+      String attemptId)
       throws IOException {
 
     boolean initStatus = false;
     String attemptDirPath =
-        TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+        TaskTracker.getLocalTaskDir(user, jobId, attemptId);
 
     for (String localDir : localDirs) {
       Path localAttemptDir = new Path(localDir, attemptDirPath);

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Tue Jun  5 02:33:44 2012
@@ -334,7 +334,8 @@ public class JobContextImpl implements J
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getArchiveTimestamps() {
+  @Override
+  public long[] getArchiveTimestamps() {
     return DistributedCache.getArchiveTimestamps(conf);
   }
 
@@ -344,7 +345,8 @@ public class JobContextImpl implements J
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getFileTimestamps() {
+  @Override
+  public long[] getFileTimestamps() {
     return DistributedCache.getFileTimestamps(conf);
   }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Jun  5 02:33:44 2012
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.util.AsyncDiskService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,6 +58,8 @@ public class MRAsyncDiskService {
   
   AsyncDiskService asyncDiskService;
   
+  TaskController taskController;
+  
   public static final String TOBEDELETED = "toBeDeleted";
   
   /**
@@ -64,14 +67,18 @@ public class MRAsyncDiskService {
    * root directories).
    * 
    * The AsyncDiskServices uses one ThreadPool per volume to do the async disk
-   * operations.
+   * operations. A {@link TaskController} is passed that will be used to do
+   * the deletes
    * 
    * @param localFileSystem The localFileSystem used for deletions.
+   * @param taskController The taskController that should be used for the 
+   * delete operations
    * @param nonCanonicalVols The roots of the file system volumes, which may
    * be absolte paths, or paths relative to the ${user.dir} system property
    * ("cwd").
    */
-  public MRAsyncDiskService(FileSystem localFileSystem,
+  public MRAsyncDiskService(FileSystem localFileSystem, 
+      TaskController taskController,
       String... nonCanonicalVols) throws IOException {
     
     this.localFileSystem = localFileSystem;
@@ -84,6 +91,8 @@ public class MRAsyncDiskService {
     
     asyncDiskService = new AsyncDiskService(this.volumes);
     
+    this.taskController = taskController;
+    
     // Create one ThreadPool per volume
     for (int v = 0 ; v < volumes.length; v++) {
       // Create the root for file deletion
@@ -109,13 +118,31 @@ public class MRAsyncDiskService {
               + " because it's outside of " + volumes[v]);
         }
         DeleteTask task = new DeleteTask(volumes[v], absoluteFilename,
-            relative);
+            relative, files[f].getOwner());
         execute(volumes[v], task);
       }
     }
   }
   
   /**
+   * Create a AsyncDiskServices with a set of volumes (specified by their
+   * root directories).
+   * 
+   * The AsyncDiskServices uses one ThreadPool per volume to do the async disk
+   * operations.
+   * 
+   * @param localFileSystem The localFileSystem used for deletions.
+   * @param nonCanonicalVols The roots of the file system volumes, which may
+   * be absolte paths, or paths relative to the ${user.dir} system property
+   * ("cwd").
+   */ 
+  public MRAsyncDiskService(FileSystem localFileSystem, 
+      String... nonCanonicalVols) throws IOException {
+    this(localFileSystem, null, nonCanonicalVols);
+  }
+  
+  
+  /**
    * Initialize MRAsyncDiskService based on conf.
    * @param conf  local file system and local dirs will be read from conf 
    */
@@ -174,6 +201,8 @@ public class MRAsyncDiskService {
     String originalPath;
     /** The file name after the move */
     String pathToBeDeleted;
+    /** The owner of the file */
+    String owner;
     
     /**
      * Delete a file/directory (recursively if needed).
@@ -181,11 +210,14 @@ public class MRAsyncDiskService {
      * @param originalPath  The original name, relative to volume root.
      * @param pathToBeDeleted  The name after the move, relative to volume root,
      *                         containing TOBEDELETED.
+     * @param owner         The owner of the file
      */
-    DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
+    DeleteTask(String volume, String originalPath, String pathToBeDeleted, 
+        String owner) {
       this.volume = volume;
       this.originalPath = originalPath;
       this.pathToBeDeleted = pathToBeDeleted;
+      this.owner = owner;
     }
     
     @Override
@@ -201,7 +233,12 @@ public class MRAsyncDiskService {
       Exception e = null;
       try {
         Path absolutePathToBeDeleted = new Path(volume, pathToBeDeleted);
-        success = localFileSystem.delete(absolutePathToBeDeleted, true);
+        if (taskController != null & owner != null) {
+          taskController.deleteAsUser(owner, 
+                                      absolutePathToBeDeleted.toString());
+        } else {
+          success = localFileSystem.delete(absolutePathToBeDeleted, true);
+        }
       } catch (Exception ex) {
         e = ex;
       }
@@ -262,8 +299,9 @@ public class MRAsyncDiskService {
       // Return false in case that the file is not found.  
       return false;
     }
-
-    DeleteTask task = new DeleteTask(volume, pathName, newPathName);
+    FileStatus status = localFileSystem.getFileStatus(target);
+    DeleteTask task = new DeleteTask(volume, pathName, newPathName, 
+                                     status.getOwner());
     execute(volume, task);
     return true;
   }
@@ -371,5 +409,31 @@ public class MRAsyncDiskService {
     throw new IOException("Cannot delete " + absolutePathName
         + " because it's outside of all volumes.");
   }
-  
+  /**
+   * Move the path name to a temporary location and then delete it.
+   * 
+   * Note that if there is no volume that contains this path, the path
+   * will stay as it is, and the function will return false.
+   *  
+   * This functions returns when the moves are done, but not necessarily all
+   * deletions are done. This is usually good enough because applications 
+   * won't see the path name under the old name anyway after the move. 
+   * 
+   * @param volume              The disk volume
+   * @param absolutePathName    The path name from root "/"
+   * @throws IOException        If the move failed
+   * @return   false if we are unable to move the path name
+   */
+  public boolean moveAndDeleteAbsolutePath(String volume, 
+                                           String absolutePathName)
+  throws IOException {
+    String relative = getRelativePathName(absolutePathName, volume);
+    if (relative == null) {
+      // This should never happen
+      throw new IOException("Cannot delete " + absolutePathName
+          + " because it's outside of " + volume);
+    }
+    return moveAndDeleteRelativePath(volume, relative);
+  }
+
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java Tue Jun  5 02:33:44 2012
@@ -35,15 +35,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ProcfsBasedProcessTree extends ProcessTree {
+public class ProcfsBasedProcessTree {
 
   static final Log LOG = LogFactory
       .getLog(ProcfsBasedProcessTree.class);
@@ -91,20 +94,19 @@ public class ProcfsBasedProcessTree exte
   // to a test directory.
   private String procfsDir;
   
-  private Integer pid = -1;
+  private final Integer pid;
   private Long cpuTime = 0L;
   private boolean setsidUsed = false;
-  private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
 
-  private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+  private Map<Integer, ProcessInfo> processTree =
+    new HashMap<Integer, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
-    this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+    this(pid, false);
   }
 
-  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
-                                long sigkillInterval) {
-    this(pid, setsidUsed, sigkillInterval, PROCFS);
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed) {
+    this(pid, setsidUsed, PROCFS);
   }
 
   /**
@@ -115,29 +117,14 @@ public class ProcfsBasedProcessTree exte
    * 
    * @param pid root of the process tree
    * @param setsidUsed true, if setsid was used for the root pid
-   * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL 
-   *                        when killing a process tree
    * @param procfsDir the root of a proc file system - only used for testing. 
    */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
-                                long sigkillInterval, String procfsDir) {
+      String procfsDir) {
     this.pid = getValidPID(pid);
     this.setsidUsed = setsidUsed;
-    sleeptimeBeforeSigkill = sigkillInterval;
     this.procfsDir = procfsDir;
   }
-  
-  /**
-   * Sets SIGKILL interval
-   * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
-   *                  String, boolean, long)} instead
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   */
-  @Deprecated
-  public void setSigKillInterval(long interval) {
-    sleeptimeBeforeSigkill = interval;
-  }
 
   /**
    * Checks if the ProcfsBasedProcessTree is available on this system.
@@ -238,112 +225,49 @@ public class ProcfsBasedProcessTree exte
 
   /**
    * Is the root-process alive?
-   * 
    * @return true if the root-process is alive, false otherwise.
    */
-  public boolean isAlive() {
-    if (pid == -1) {
-      return false;
-    } else {
-      return isAlive(pid.toString());
-    }
+  boolean isAlive(int pid, TaskController taskController) {
+    try {
+      return taskController.signalTask(null, pid, Signal.NULL);
+    } catch (IOException ignored) { }
+    return false;
+  }
+
+  boolean isAlive(TaskController taskController) {
+    return isAlive(pid, taskController);
   }
 
   /**
    * Is any of the subprocesses in the process-tree alive?
-   * 
    * @return true if any of the processes in the process-tree is
    *           alive, false otherwise.
    */
-  public boolean isAnyProcessInTreeAlive() {
+  boolean isAnyProcessInTreeAlive(TaskController taskController) {
     for (Integer pId : processTree.keySet()) {
-      if (isAlive(pId.toString())) {
+      if (isAlive(pId, taskController)) {
         return true;
       }
     }
     return false;
   }
 
+
   /** Verify that the given process id is same as its process group id.
    * @param pidStr Process id of the to-be-verified-process
    * @param procfsDir  Procfs root dir
    */
-  static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
-    Integer pId = Integer.parseInt(pidStr);
-    // Get information for this process
-    ProcessInfo pInfo = new ProcessInfo(pId);
-    pInfo = constructProcessInfo(pInfo, procfsDir);
-    if (pInfo == null) {
-      // process group leader may have finished execution, but we still need to
-      // kill the subProcesses in the process group.
-      return true;
-    }
-
-    //make sure that pId and its pgrpId match
-    if (!pInfo.getPgrpId().equals(pId)) {
-      LOG.warn("Unexpected: Process with PID " + pId +
-               " is not a process group leader.");
-      return false;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(pId + " is a process group leader, as expected.");
-    }
-    return true;
+  public boolean checkPidPgrpidForMatch() {
+    return checkPidPgrpidForMatch(pid, PROCFS);
   }
 
-  /** Make sure that the given pid is a process group leader and then
-   * destroy the process group.
-   * @param pgrpId   Process group id of to-be-killed-processes
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
-                       boolean inBackground)
-         throws IOException {
-    // Make sure that the pid given is a process group leader
-    if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
-      throw new IOException("Process with PID " + pgrpId  +
-                          " is not a process group leader.");
-    }
-    destroyProcessGroup(pgrpId, interval, inBackground);
-  }
-
-  /**
-   * Destroy the process-tree.
-   */
-  public void destroy() {
-    destroy(true);
-  }
-  
-  /**
-   * Destroy the process-tree.
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public void destroy(boolean inBackground) {
-    LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
-    if (pid == -1) {
-      return;
-    }
-    if (isAlive(pid.toString())) {
-      if (isSetsidAvailable && setsidUsed) {
-        // In this case, we know that pid got created using setsid. So kill the
-        // whole processGroup.
-        try {
-          assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
-                              inBackground);
-        } catch (IOException e) {
-          LOG.warn(StringUtils.stringifyException(e));
-        }
-      }
-      else {
-        //TODO: Destroy all the processes in the subtree in this case also.
-        // For the time being, killing only the root process.
-        destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
-      }
-    }
+  static boolean checkPidPgrpidForMatch(int _pid, String procfs) {
+    // Get information for this process
+    ProcessInfo pInfo = new ProcessInfo(_pid);
+    pInfo = constructProcessInfo(pInfo, procfs);
+    // null if process group leader finished execution; issue no warning
+    // make sure that pid and its pgrpId match
+    return pInfo == null || pInfo.getPgrpId().equals(_pid);
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java Tue Jun  5 02:33:44 2012
@@ -38,14 +38,20 @@ public class ProcfsBasedProcessTree exte
     super(pid);
   }
 
+  /**
+   * @param sigkillInterval Has no effect
+   */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
       long sigkillInterval) {
-    super(pid, setsidUsed, sigkillInterval);
+    super(pid, setsidUsed);
   }
 
+  /**
+   * @param sigkillInterval Has no effect
+   */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
       long sigkillInterval, String procfsDir) {
-    super(pid, setsidUsed, sigkillInterval, procfsDir);
+    super(pid, setsidUsed, procfsDir);
   }
 
   public ProcfsBasedProcessTree getProcessTree() {

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Jun  5 02:33:44 2012
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -80,41 +81,23 @@ public class ClusterWithLinuxTaskControl
         + "/task-controller";
     
     @Override
-    public void setup() throws IOException {
+    public void setup(LocalDirAllocator allocator) throws IOException {
       getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
 
       // write configuration file
       configurationFile = createTaskControllerConf(System
           .getProperty(TASKCONTROLLER_PATH), getConf());
-      super.setup();
+      super.setup(allocator);
     }
 
     @Override
-    protected String getTaskControllerExecutablePath() {
-      return new File(taskControllerExePath).getAbsolutePath();
+    protected String getTaskControllerExecutablePath(Configuration conf) {
+      return taskControllerExePath;
     }
 
     void setTaskControllerExe(String execPath) {
       this.taskControllerExePath = execPath;
     }
-
-    volatile static int attemptedSigQuits = 0;
-    volatile static int failedSigQuits = 0;
-
-    /** Work like LinuxTaskController, but also count the number of
-      * attempted and failed SIGQUIT sends via the task-controller
-      * executable.
-      */
-    @Override
-    void dumpTaskStack(TaskControllerContext context) {
-      attemptedSigQuits++;
-      try {
-        signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
-      } catch (Exception e) {
-        LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
-        failedSigQuits++;
-      }
-    }
   }
 
   // cluster instances which sub classes can use
@@ -275,7 +258,7 @@ public class ClusterWithLinuxTaskControl
       if (ugi.indexOf(",") > 1) {
         return true;
       }
-      LOG.info("Invalid taskcontroller-ugi : " + ugi); 
+      LOG.info("Invalid taskcontroller-ugi (requires \"user,group\"): " + ugi); 
       return false;
     }
     LOG.info("Invalid taskcontroller-ugi : " + ugi);

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java Tue Jun  5 02:33:44 2012
@@ -35,7 +35,9 @@ import static org.junit.Assert.*;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Ignore;
 
+@Ignore("The debug script is broken in the current build.")
 public class TestDebugScript {
 
   // base directory which is used by the debug script

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Jun  5 02:33:44 2012
@@ -23,7 +23,6 @@ import java.security.PrivilegedException
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -108,33 +107,4 @@ public class TestJobExecutionAsDifferent
     });
   }
 
-  /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
-   * if a task times out.
-   */
-  public void testTimeoutStackTrace() throws Exception {
-    if (!shouldRun()) {
-      return;
-    }
-
-    // Run a job that should timeout and trigger a SIGQUIT.
-    startCluster();
-    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
-      public Object run() throws Exception {
-        JobConf conf = getClusterConf();
-        conf.setInt(JobContext.TASK_TIMEOUT, 10000);
-        conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
-        SleepJob sleepJob = new SleepJob();
-        sleepJob.setConf(conf);
-        Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
-        job.setMaxMapAttempts(1);
-        int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
-        job.waitForCompletion(true);
-        assertTrue("Did not detect a new SIGQUIT!",
-            prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
-        assertEquals("A SIGQUIT attempt failed!", 0,
-            MyLinuxTaskController.failedSigQuits);
-        return null;
-      }
-    });
-  }
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Jun  5 02:33:44 2012
@@ -18,20 +18,12 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedReader;
-import java.io.FileInputStream;
 import java.io.File;
-import java.io.InputStreamReader;
 import java.io.IOException;
 
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.SleepJob;
 
 /**
  * A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -39,96 +31,39 @@ import org.apache.hadoop.mapreduce.Sleep
  */
 public class TestJobKillAndFail extends TestCase {
 
-  static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
 
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  /**
-   * TaskController instance that just sets a flag when a stack dump
-   * is performed in a child thread.
-   */
-  static class MockStackDumpTaskController extends DefaultTaskController {
 
-    static volatile int numStackDumps = 0;
-
-    static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
-
-    public MockStackDumpTaskController() {
-      LOG.info("Instantiated MockStackDumpTC");
-    }
-
-    @Override
-    void dumpTaskStack(TaskControllerContext context) {
-      LOG.info("Got stack-dump request in TaskController");
-      MockStackDumpTaskController.numStackDumps++;
-      super.dumpTaskStack(context);
-    }
-
-  }
-
-  /** If a task was killed, then dumpTaskStack() should have been
-    * called. Test whether or not the counter was incremented
-    * and succeed/fail based on this. */
-  private void checkForStackDump(boolean expectDump, int lastNumDumps) {
-    int curNumDumps = MockStackDumpTaskController.numStackDumps;
-
-    LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
-        + "; expect=" + expectDump);
-
-    if (expectDump) {
-      assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
-    } else {
-      assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
-    }
-  }
-
-  public void testJobFailAndKill() throws Exception {
+  public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
       JobConf jtConf = new JobConf();
       jtConf.set("mapred.jobtracker.instrumentation", 
           JTInstrumentation.class.getName());
-      jtConf.set("mapreduce.tasktracker.taskcontroller",
-          MockStackDumpTaskController.class.getName());
       mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
       JTInstrumentation instr = (JTInstrumentation) 
         mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
       
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
-      RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
+      RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
-      assertEquals(runningJob.getJobState(), JobStatus.FAILED);
+      assertEquals(job.getJobState(), JobStatus.FAILED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.failed);
       instr.reset();
 
-      int prevNumDumps = MockStackDumpTaskController.numStackDumps;
-      runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
+      job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
-      assertTrue(runningJob.isComplete());
-      assertEquals(runningJob.getJobState(), JobStatus.KILLED);
+      assertTrue(job.isComplete());
+      assertEquals(job.getJobState(), JobStatus.KILLED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.killed);
-      // check that job kill does not put a stacktrace in task logs.
-      checkForStackDump(false, prevNumDumps);
-
-      // Test that a task that times out does have a stack trace
-      conf = mr.createJobConf();
-      conf.setInt(JobContext.TASK_TIMEOUT, 10000);
-      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
-      SleepJob sleepJob = new SleepJob();
-      sleepJob.setConf(conf);
-      Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
-      job.setMaxMapAttempts(1);
-      prevNumDumps = MockStackDumpTaskController.numStackDumps;
-      job.waitForCompletion(true);
-      checkForStackDump(true, prevNumDumps);
     } finally {
       if (mr != null) {
         mr.shutdown();

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Tue Jun  5 02:33:44 2012
@@ -194,7 +194,7 @@ public class TestJobRetire extends TestC
     }
     
     @Override
-    public synchronized void shutdown() throws IOException {
+    public synchronized void shutdown() throws IOException, InterruptedException {
       alive = false;
       super.shutdown();
     }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestJvmManager.java Tue Jun  5 02:33:44 2012
@@ -31,12 +31,19 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import static org.junit.Assert.*;
 import org.junit.Before;
@@ -52,6 +59,8 @@ public class TestJvmManager {
   private TaskTracker tt;
   private JvmManager jvmManager;
   private JobConf ttConf;
+  private boolean threadCaughtException = false;
+  private String user;
 
   @Before
   public void setUp() {
@@ -64,15 +73,23 @@ public class TestJvmManager {
   }
 
   public TestJvmManager() throws Exception {
+    user = UserGroupInformation.getCurrentUser().getShortUserName();
     tt = new TaskTracker();
     ttConf = new JobConf();
     ttConf.setLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, 2000);
     tt.setConf(ttConf);
     tt.setMaxMapSlots(MAP_SLOTS);
     tt.setMaxReduceSlots(REDUCE_SLOTS);
-    tt.setTaskController(new DefaultTaskController());
+    TaskController dtc;
+    tt.setTaskController((dtc = new DefaultTaskController()));
+    Configuration conf = new Configuration();
+    dtc.setConf(conf);
+    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+    tt.getTaskController().setup(ldirAlloc);
+    JobID jobId = new JobID("test", 0);
     jvmManager = new JvmManager(tt);
     tt.setJvmManagerInstance(jvmManager);
+    tt.setCleanupThread(new InlineCleanupQueue());
   }
 
   // write a shell script to execute the command.
@@ -107,16 +124,22 @@ public class TestJvmManager {
     // launch a jvm
     JobConf taskConf = new JobConf(ttConf);
     TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, 0, 0);
-    Task task = new MapTask(null, attemptID, 0, null, 1);
+    Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
     File pidFile = new File(TEST_DIR, "pid");
-    final TaskRunner taskRunner = task.createRunner(tt, tip);
+    RunningJob rjob = new RunningJob(attemptID.getJobID());
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(ttConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(ttConf).
+      newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+    final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
     // launch a jvm which sleeps for 60 seconds
     final Vector<String> vargs = new Vector<String>(2);
     vargs.add(writeScript("SLEEP", "sleep 60\n", pidFile).getAbsolutePath());
     final File workDir = new File(TEST_DIR, "work");
-    workDir.mkdir();
     final File stdout = new File(TEST_DIR, "stdout");
     final File stderr = new File(TEST_DIR, "stderr");
 
@@ -125,10 +148,13 @@ public class TestJvmManager {
       public void run() {
         try {
           taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
-              workDir, null);
+              workDir);
         } catch (InterruptedException e) {
           e.printStackTrace();
           return;
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
         }
       }
     };
@@ -156,7 +182,14 @@ public class TestJvmManager {
     final JvmRunner jvmRunner = mapJvmManager.jvmIdToRunner.get(jvmid);
     Thread killer = new Thread() {
       public void run() {
-        jvmRunner.kill();
+        try {
+          jvmRunner.kill();
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
       }
     };
     killer.start();
@@ -171,23 +204,27 @@ public class TestJvmManager {
 
     // launch another jvm and see it finishes properly
     attemptID = new TaskAttemptID("test", 0, TaskType.MAP, 0, 1);
-    task = new MapTask(null, attemptID, 0, null, 1);
+    task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     tip = tt.new TaskInProgress(task, taskConf);
-    TaskRunner taskRunner2 = task.createRunner(tt, tip);
+    TaskRunner taskRunner2 = task.createRunner(tt, tip, rjob);
     // build dummy vargs to call ls
     Vector<String> vargs2 = new Vector<String>(1);
     vargs2.add(writeScript("LS", "ls", pidFile).getAbsolutePath());
     File workDir2 = new File(TEST_DIR, "work2");
-    workDir.mkdir();
     File stdout2 = new File(TEST_DIR, "stdout2");
     File stderr2 = new File(TEST_DIR, "stderr2");
-    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2,
-        null);
+    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2);
     // join all the threads
     killer.join();
     jvmRunner.join();
     launcher.join();
+    assertFalse("Thread caught unexpected IOException", 
+                 threadCaughtException);
+  }
+  private void setThreadCaughtException() {
+    threadCaughtException = true;
   }
 
 
@@ -198,6 +235,8 @@ public class TestJvmManager {
    */
   @Test
   public void testForRaces() throws Exception {
+    fail("TODO: re-enable test after 2178 merge");
+    /*
     JvmManagerForType mapJvmManager = jvmManager
         .getJvmManagerForType(TaskType.MAP);
 
@@ -248,6 +287,7 @@ public class TestJvmManager {
     if (failed.get() != null) {
       throw new RuntimeException(failed.get());
     }
+  */
   }
 
   /**

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Tue Jun  5 02:33:44 2012
@@ -33,10 +33,10 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
 
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -151,6 +151,8 @@ public class TestKillSubProcesses extend
         break;
       }
     }
+    final TaskController tc =
+      mr.getTaskTrackerRunner(0).getTaskTracker().getTaskController();
 
     pid = null;
     jobClient = new JobClient(conf);
@@ -195,7 +197,7 @@ public class TestKillSubProcesses extend
     }
 
     // Checking if the descendant processes of map task are alive
-    if(ProcessTree.isSetsidAvailable) {
+    if(TaskController.isSetsidAvailable) {
       String childPid = TestProcfsBasedProcessTree.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + 0);
       while(childPid == null) {
@@ -243,11 +245,11 @@ public class TestKillSubProcesses extend
     }
 
     // Checking if the map task got killed or not
-    assertTrue(!ProcessTree.isAlive(pid));
+    assertTrue(!isAlive(pid));
     LOG.info("The map task is not alive after Job is completed, as expected.");
 
     // Checking if the descendant processes of map task are killed properly
-    if(ProcessTree.isSetsidAvailable) {
+    if(TaskController.isSetsidAvailable) {
       for(int i=0; i <= numLevelsOfSubProcesses; i++) {
         String childPid = TestProcfsBasedProcessTree.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + i);
@@ -310,9 +312,10 @@ public class TestKillSubProcesses extend
       return;
     }
     
-    JobConf conf=null;
     try {
-      mr = new MiniMRCluster(1, "file:///", 1);
+      JobConf conf = new JobConf();
+      conf.setLong(JvmManager.JvmManagerForType.DELAY_BEFORE_KILL_KEY, 0L);
+      mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
 
       // run the TCs
       conf = mr.createJobConf();
@@ -354,7 +357,7 @@ public class TestKillSubProcesses extend
    * Runs a recursive shell script to create a chain of subprocesses
    */
   private static void runChildren(JobConf conf) throws IOException {
-    if (ProcessTree.isSetsidAvailable) {
+    if (TaskController.isSetsidAvailable) {
       FileSystem fs = FileSystem.getLocal(conf);
 
       if (fs.exists(scriptDir)) {

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLinuxTaskController.java Tue Jun  5 02:33:44 2012
@@ -22,22 +22,27 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
+
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.hadoop.mapred.LinuxTaskController.ResultCode.*;
+
 import junit.framework.TestCase;
 
+@Ignore("Negative test relies on properties fixed during TC compilation")
 public class TestLinuxTaskController extends TestCase {
-  private static int INVALID_TASKCONTROLLER_PERMISSIONS = 24;
   private static File testDir = new File(System.getProperty("test.build.data",
       "/tmp"), TestLinuxTaskController.class.getName());
-  private static String taskControllerPath = System
-      .getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+  private static String taskControllerPath =
+    System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
 
   @Before
   protected void setUp() throws Exception {
@@ -51,9 +56,8 @@ public class TestLinuxTaskController ext
 
   public static class MyLinuxTaskController extends LinuxTaskController {
     String taskControllerExePath = taskControllerPath + "/task-controller";
-
     @Override
-    protected String getTaskControllerExecutablePath() {
+    protected String getTaskControllerExecutablePath(Configuration conf) {
       return taskControllerExePath;
     }
   }
@@ -64,16 +68,18 @@ public class TestLinuxTaskController ext
       // task controller setup should fail validating permissions.
       Throwable th = null;
       try {
-        controller.setup();
+        controller.setup(new LocalDirAllocator("mapred.local.dir"));
       } catch (IOException ie) {
         th = ie;
       }
       assertNotNull("No exception during setup", th);
-      assertTrue("Exception message does not contain exit code"
-          + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
-          "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
+      assertTrue("Exception message \"" + th.getMessage() +
+            "\" does not contain exit code " +
+            INVALID_TASKCONTROLLER_PERMISSIONS.getValue(),
+          th.getMessage().contains(
+            "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS.getValue()));
     } else {
-      controller.setup();
+      controller.setup(new LocalDirAllocator("mapred.local.dir"));
     }
 
   }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Tue Jun  5 02:33:44 2012
@@ -45,8 +45,10 @@ import org.apache.hadoop.mapred.lib.Iden
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
 
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -85,6 +87,17 @@ import static org.junit.Assert.assertFal
  *
  **********************************************************/
 public class TestMapRed extends Configured implements Tool {
+
+  static final Path TESTDIR =
+    new Path(System.getProperty("test.build.data", "/tmp"),
+        TestMapRed.class.getSimpleName());
+
+  @Before
+  public void removeTestdir() throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(new Configuration()).getRaw();
+    rfs.delete(TESTDIR, true);
+  }
+
   /**
    * Modified to make it a junit test.
    * The RandomGen Job does the actual work of creating
@@ -370,7 +383,8 @@ public class TestMapRed extends Configur
                                 boolean includeCombine
                                 ) throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
-    Path testdir = new Path("build/test/test.mapred.compress");
+    Path testdir = new Path(System.getProperty("test.build.data", "/tmp"),
+        "test.mapred.compress");
     Path inDir = new Path(testdir, "in");
     Path outDir = new Path(testdir, "out");
     FileSystem fs = FileSystem.get(conf);
@@ -461,7 +475,8 @@ public class TestMapRed extends Configur
     // Write the answer key to a file.  
     //
     FileSystem fs = FileSystem.get(conf);
-    Path testdir = new Path("mapred.loadtest");
+    final Path testdir = new Path(System.getProperty("test.build.data", "/tmp"),
+        "mapred.loadtest");
     if (!fs.mkdirs(testdir)) {
       throw new IOException("Mkdirs failed to create " + testdir.toString());
     }
@@ -723,7 +738,8 @@ public class TestMapRed extends Configur
   public void runJob(int items) {
     try {
       JobConf conf = new JobConf(TestMapRed.class);
-      Path testdir = new Path("build/test/test.mapred.spill");
+      Path testdir = new Path(System.getProperty("build.test.data", "/tmp"),
+          "test.mapred.spill");
       Path inDir = new Path(testdir, "in");
       Path outDir = new Path(testdir, "out");
       FileSystem fs = FileSystem.get(conf);

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Jun  5 02:33:44 2012
@@ -134,7 +134,6 @@ public class TestMiniMRWithDFS extends T
           .isDirectory());
       LOG.info("Verifying contents of " + MRConfig.LOCAL_DIR + " "
           + localDir.getAbsolutePath());
-
       // Verify contents(user-dir) of tracker-sub-dir
       File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
       if (trackerSubDir.isDirectory()) {

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Tue Jun  5 02:33:44 2012
@@ -20,7 +20,10 @@ package org.apache.hadoop.mapred;
 
 import java.io.*;
 import java.util.*;
-import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 import org.apache.commons.logging.*;
 
@@ -28,17 +31,26 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 
-public class TestSequenceFileInputFormat extends TestCase {
+public class TestSequenceFileInputFormat {
   private static final Log LOG = FileInputFormat.LOG;
 
   private static int MAX_LENGTH = 10000;
   private static Configuration conf = new Configuration();
+  static final Path TESTDIR =
+    new Path(System.getProperty("test.build.data", "/tmp"),
+        TestSequenceFileInputFormat.class.getSimpleName());
+
+  @Before
+  public void removeTestdir() throws IOException {
+    final FileSystem rfs = FileSystem.getLocal(new Configuration()).getRaw();
+    rfs.delete(TESTDIR, true);
+  }
 
+  @Test
   public void testFormat() throws Exception {
     JobConf job = new JobConf(conf);
     FileSystem fs = FileSystem.getLocal(conf);
-    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
-    Path file = new Path(dir, "test.seq");
+    Path file = new Path(TESTDIR, "test.seq").makeQualified(fs);
     
     Reporter reporter = Reporter.NULL;
     
@@ -46,9 +58,7 @@ public class TestSequenceFileInputFormat
     //LOG.info("seed = "+seed);
     Random random = new Random(seed);
 
-    fs.delete(dir, true);
-
-    FileInputFormat.setInputPaths(job, dir);
+    FileInputFormat.setInputPaths(job, TESTDIR);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
@@ -108,6 +118,7 @@ public class TestSequenceFileInputFormat
         assertEquals("Some keys in no partition.", length, bits.cardinality());
       }
 
+      fs.delete(TESTDIR, true);
     }
   }
 

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskCommit.java Tue Jun  5 02:33:44 2012
@@ -161,6 +161,13 @@ public class TestTaskCommit extends Hado
         throws IOException {
       return 0;
     }
+
+    @Override
+    public void 
+    updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                       long[] sizes) throws IOException {
+      // NOTHING
+    }
   }
   
   /**

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Jun  5 02:33:44 2012
@@ -34,10 +34,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -47,19 +48,19 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
 import junit.framework.TestCase;
+import org.junit.Ignore;
 
 /**
  * Test to verify localization of a job and localization of a task on a
  * TaskTracker.
  * 
  */
+@Ignore // test relies on deprecated functionality/lifecycle
 public class TestTaskTrackerLocalization extends TestCase {
 
   private static File TEST_ROOT_DIR = 
@@ -181,7 +182,14 @@ public class TestTaskTrackerLocalization
     // Set up the TaskTracker
     tracker = new TaskTracker();
     tracker.setConf(trackerFConf);
-    tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf));
+    // setup task controller
+    taskController = createTaskController();
+    taskController.setConf(trackerFConf);
+    taskController.setup(lDirAlloc);
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(),localDirs));
+    tracker.setTaskLogCleanupThread(new UserLogCleaner(trackerFConf, 
+                                        taskController));
     initializeTracker();
   }
 
@@ -203,13 +211,6 @@ public class TestTaskTrackerLocalization
     tracker.setTaskTrackerInstrumentation(
         TaskTracker.createInstrumentation(tracker, trackerFConf));
 
-    // setup task controller
-    taskController = createTaskController();
-    taskController.setConf(trackerFConf);
-    taskController.setup();
-    tracker.setTaskController(taskController);
-    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
-        taskController));
   }
 
   protected TaskController createTaskController() {
@@ -642,13 +643,20 @@ public class TestTaskTrackerLocalization
         + " is not created in any of the configured dirs!!",
         attemptWorkDir != null);
 
-    TaskRunner runner = task.createRunner(tracker, tip);
+    RunningJob rjob = new RunningJob(jobId);
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(trackerFConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(trackerFConf).
+      newTaskDistributedCacheManager(jobId, trackerFConf);
+
+    TaskRunner runner = task.createRunner(tracker, tip, rjob);
     tip.setTaskRunner(runner);
 
     // /////// Few more methods being tested
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
-        localizedJobConf);
+        localizedJobConf, false);
     attemptLogFiles = runner.prepareLogFiles(task.getTaskID(),
         task.isTaskCleanupTask());
 
@@ -666,16 +674,6 @@ public class TestTaskTrackerLocalization
     TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
     // ///////
 
-    // Initialize task via TaskController
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
   }
 
   protected void checkTaskLocalization()
@@ -734,13 +732,13 @@ public class TestTaskTrackerLocalization
     out.writeBytes("dummy input");
     out.close();
     // no write permission for subDir and subDir/file
+    int ret = 0;
     try {
-      int ret = 0;
       if((ret = FileUtil.chmod(subDir.toUri().getPath(), "a=rx", true)) != 0) {
         LOG.warn("chmod failed for " + subDir + ";retVal=" + ret);
       }
-    } catch(InterruptedException e) {
-      LOG.warn("Interrupted while doing chmod for " + subDir);
+    } catch (InterruptedException e) {
+      throw new IOException("chmod interrupted", e);
     }
   }
 
@@ -772,7 +770,7 @@ public class TestTaskTrackerLocalization
     InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
     tracker.setCleanupThread(cleanupQueue);
 
-    tip.removeTaskFiles(needCleanup, taskId);
+    tip.removeTaskFiles(needCleanup);
 
     if (jvmReuse) {
       // work dir should still exist and cleanup queue should be empty



Mime
View raw message