hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [15/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Jan 26 14:02:53 2010
@@ -21,6 +21,7 @@
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.jar.JarOutputStream;
@@ -36,15 +37,18 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 
 import junit.framework.TestCase;
 
@@ -55,9 +59,14 @@
  */
 public class TestTaskTrackerLocalization extends TestCase {
 
-  private File TEST_ROOT_DIR;
+  private static File TEST_ROOT_DIR = 
+    new File(System.getProperty("test.build.data", "/tmp"));
   private File ROOT_MAPRED_LOCAL_DIR;
   private File HADOOP_LOG_DIR;
+  private static File PERMISSION_SCRIPT_DIR;
+  private static File PERMISSION_SCRIPT_FILE;
+  private static final String PERMISSION_SCRIPT_CONTENT = "ls -l -d $1 | " +
+  		"awk '{print $1\":\"$3\":\"$4}'";
 
   private int numLocalDirs = 6;
   private static final Log LOG =
@@ -78,36 +87,20 @@
   protected File[] attemptLogFiles;
   protected JobConf localizedTaskConf;
 
-  class InlineCleanupQueue extends CleanupQueue {
-    List<Path> stalePaths = new ArrayList<Path>();
-
-    public InlineCleanupQueue() {
-      // do nothing
-    }
-
-    @Override
-    public void addToQueue(FileSystem fs, Path... paths) {
-      // delete in-line
-      for (Path p : paths) {
-        try {
-          LOG.info("Trying to delete the path " + p);
-          if (!fs.delete(p, true)) {
-            LOG.warn("Stale path " + p.toUri().getPath());
-            stalePaths.add(p);
-          }
-        } catch (IOException e) {
-          LOG.warn("Caught exception while deleting path "
-              + p.toUri().getPath());
-          LOG.info(StringUtils.stringifyException(e));
-          stalePaths.add(p);
-        }
-      }
-    }
+  /**
+   * Dummy method in this base class. Only derived classes will define this
+   * method for checking if a test can be run.
+   */
+  protected boolean canRun() {
+    return true;
   }
 
   @Override
   protected void setUp()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     TEST_ROOT_DIR =
         new File(System.getProperty("test.build.data", "/tmp"), getClass()
             .getSimpleName());
@@ -147,7 +140,8 @@
     tracker.setConf(trackerFConf);
 
     // for test case system FS is the local FS
-    tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.systemFS = FileSystem.getLocal(trackerFConf);
+    tracker.setLocalFileSystem(tracker.systemFS);
     tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
     
     taskTrackerUGI = UserGroupInformation.login(trackerFConf);
@@ -158,7 +152,7 @@
     taskId =
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
-        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
     task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
 
     // create jobTokens file
@@ -169,11 +163,40 @@
     taskController.setConf(trackerFConf);
     taskController.setup();
 
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
   }
 
   /**
+   * static block setting up the permission script which would be used by the 
+   * checkFilePermissions
+   */
+  static {
+    PERMISSION_SCRIPT_DIR = new File(TEST_ROOT_DIR, "permission_script_dir");
+    PERMISSION_SCRIPT_FILE = new File(PERMISSION_SCRIPT_DIR, "getperms.sh");
+    
+    if(PERMISSION_SCRIPT_FILE.exists()) {
+      PERMISSION_SCRIPT_FILE.delete();
+    }
+    
+    if(PERMISSION_SCRIPT_DIR.exists()) {
+      PERMISSION_SCRIPT_DIR.delete();
+    }
+    
+    PERMISSION_SCRIPT_DIR.mkdir();
+    
+    try {
+      PrintWriter writer = new PrintWriter(PERMISSION_SCRIPT_FILE);
+      writer.write(PERMISSION_SCRIPT_CONTENT);
+      writer.close();
+    } catch (FileNotFoundException fe) {
+      fail();
+    }
+    PERMISSION_SCRIPT_FILE.setExecutable(true, true);
+  }
+
+  /**
    * @param job
    * @throws IOException
    * @throws FileNotFoundException
@@ -222,10 +245,10 @@
     if(!dir.exists())
       assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
     
-    File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+    File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME);
     FileOutputStream fos = new FileOutputStream(jobTokenFile);
     java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
-    JobTokens jt = new JobTokens();
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
     jt.write(out); // writing empty file, we don't the keys for this test 
     out.close();
   }
@@ -233,15 +256,31 @@
   @Override
   protected void tearDown()
       throws Exception {
+    if (!canRun()) {
+      return;
+    }
     FileUtil.fullyDelete(TEST_ROOT_DIR);
   }
 
   protected static String[] getFilePermissionAttrs(String path)
       throws IOException {
-    String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
+    String[] command = {"bash",PERMISSION_SCRIPT_FILE.getAbsolutePath(), path};
+    String output=Shell.execCommand(command);
     return output.split(":|\n");
   }
 
+
+  /**
+   * Utility method to check permission of a given path. Requires the permission
+   * script directory to be setup in order to call.
+   * 
+   * 
+   * @param path
+   * @param expectedPermissions
+   * @param expectedOwnerUser
+   * @param expectedOwnerGroup
+   * @throws IOException
+   */
   static void checkFilePermissions(String path, String expectedPermissions,
       String expectedOwnerUser, String expectedOwnerGroup)
       throws IOException {
@@ -264,6 +303,9 @@
    */
   public void testTaskControllerSetup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
     // Task-controller is already set up in the test's setup method. Now verify.
     for (String localDir : localDirs) {
 
@@ -287,7 +329,9 @@
    */
   public void testUserLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     // /////////// The main method being tested
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     // ///////////
@@ -341,7 +385,7 @@
       // Verify the distributed cache dir.
       File distributedCacheDir =
           new File(localDir, TaskTracker
-              .getDistributedCacheDir(task.getUser()));
+              .getPrivateDistributedCacheDir(task.getUser()));
       assertTrue("distributed cache dir " + distributedCacheDir
           + " doesn't exists!", distributedCacheDir.exists());
       checkFilePermissions(distributedCacheDir.getAbsolutePath(),
@@ -358,7 +402,9 @@
    */
   public void testJobLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
 
     // /////////// The main method being tested
@@ -452,7 +498,9 @@
    */
   public void testTaskLocalization()
       throws IOException {
-
+    if (!canRun()) {
+      return;
+    }
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
 
@@ -568,14 +616,102 @@
   }
 
   /**
+   * Validates the removal of $taskid and $tasid/work under mapred-local-dir
+   * in cases where those directories cannot be deleted without adding
+   * write permission to the newly created directories under $taskid and
+   * $taskid/work
+   * Also see TestSetupWorkDir.createFileAndSetPermissions for details
+   */
+  void validateRemoveFiles(boolean needCleanup, boolean jvmReuse,
+                           TaskInProgress tip) throws IOException {
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the taskDir or workDir
+    String dir = (!needCleanup || jvmReuse) ?
+        TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask())
+      : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(),
+          taskId.toString(), task.isTaskCleanupTask());
+
+    Path[] paths = tracker.getLocalFiles(localizedJobConf, dir);
+    for (Path p : paths) {
+      if (tracker.getLocalFileSystem().exists(p)) {
+        TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p);
+      }
+    }
+
+    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+    tracker.setCleanupThread(cleanupQueue);
+
+    tip.removeTaskFiles(needCleanup, taskId);
+
+    if (jvmReuse) {
+      // work dir should still exist and cleanup queue should be empty
+      assertTrue("cleanup queue is not empty after removeTaskFiles() in case "
+          + "of jvm reuse.", cleanupQueue.isQueueEmpty());
+      boolean workDirExists = false;
+      for (Path p : paths) {
+        if (tracker.getLocalFileSystem().exists(p)) {
+          workDirExists = true;
+        }
+      }
+      assertTrue("work dir does not exist in case of jvm reuse", workDirExists);
+
+      // now try to delete the work dir and verify that there are no stale paths
+      JvmManager.deleteWorkDir(tracker, task);
+    }
+    tracker.removeJobFiles(task.getUser(), jobId.toString());
+
+    assertTrue("Some task files are not deleted!! Number of stale paths is "
+        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
    * @throws IOException
    */
   public void testTaskCleanup()
       throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, false);// no needCleanup; no jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a task that is not succeeded
+   * @throws IOException
+   */
+  public void testFailedTaskCleanup()
+  throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(true, false);// needCleanup; no jvmReuse
+  }
+
+  /**
+   * Validates if task cleanup is done properly for a succeeded task
+   * @throws IOException
+   */
+  public void testTaskCleanupWithJvmUse()
+      throws IOException {
+    if (!canRun()) {
+      return;
+    }
+    testTaskCleanup(false, true);// no needCleanup; jvmReuse
+  }
 
+  /**
+   * Validates if task cleanup is done properly
+   */
+  private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
+      throws IOException {
     // Localize job and localize task.
     tracker.getLocalizer().initializeUserDirs(task.getUser());
     localizedJobConf = tracker.localizeJobFiles(task);
+    if (jvmReuse) {
+      localizedJobConf.setNumTasksToExecutePerJvm(2);
+    }
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
     JobInitializationContext jobContext = new JobInitializationContext();
@@ -614,18 +750,9 @@
 
     // TODO: Let the task run and create files.
 
-    InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
-    tracker.directoryCleanupThread = cleanupQueue;
-
-    // ////////// The central methods being tested
-    tip.removeTaskFiles(true, taskId);
-    tracker.removeJobFiles(task.getUser(), jobId.toString());
-    // //////////
-
-    // TODO: make sure that all files intended to be deleted are deleted.
-
-    assertTrue("Some task files are not deleted!! Number of stale paths is "
-        + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+    // create files and set permissions 555. Verify if task controller sets
+    // the permissions for TT to delete the task dir or work dir properly
+    validateRemoveFiles(needCleanup, jvmReuse, tip);
 
     // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
     // there.
@@ -633,7 +760,7 @@
       Path userDir =
           new Path(localDir, TaskTracker.getUserDir(task.getUser()));
       assertTrue("User directory " + userDir + " is not present!!",
-          tracker.localFs.exists(userDir));
+          tracker.getLocalFileSystem().exists(userDir));
     }
 
     // Test userlogs cleanup.
@@ -653,7 +780,7 @@
 
     // Logs should be there before cleanup.
     assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
 
     // ////////// Another being tested
     TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
@@ -662,6 +789,6 @@
 
     // Logs should be gone after cleanup.
     assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
-        tracker.localFs.exists(logDir));
+        tracker.getLocalFileSystem().exists(logDir));
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
 import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager;
@@ -36,7 +37,6 @@
     TestTrackerDistributedCacheManager {
 
   private File configFile;
-  private MyLinuxTaskController taskController;
   private String taskTrackerSpecialGroup;
 
   private static final Log LOG =
@@ -45,7 +45,7 @@
 
   @Override
   protected void setUp()
-      throws IOException {
+      throws IOException, InterruptedException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
@@ -57,7 +57,7 @@
                 .getSimpleName()).getAbsolutePath();
 
     super.setUp();
-
+    
     taskController = new MyLinuxTaskController();
     String path =
         System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
@@ -65,7 +65,7 @@
         ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
             .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
     String execPath = path + "/task-controller";
-    taskController.setTaskControllerExe(execPath);
+    ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath);
     taskController.setConf(conf);
     taskController.setup();
 
@@ -74,6 +74,17 @@
   }
 
   @Override
+  protected void refreshConf(Configuration conf) throws IOException {
+    super.refreshConf(conf);
+    String path =
+      System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+      ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+          .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+   
+  }
+
+  @Override
   protected void tearDown()
       throws IOException {
     if (!ClusterWithLinuxTaskController.shouldRun()) {
@@ -99,27 +110,19 @@
   }
 
   @Override
-  protected TaskController getTaskController() {
-    return taskController;
-  }
-
-  @Override
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    String cachedFirstFile = localCacheFiles[0].toUri().getPath();
-    String cachedSecondFile = localCacheFiles[1].toUri().getPath();
     String userName = getJobOwnerName();
 
-    // First make sure that the cache files have proper permissions.
-    TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-    TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
-        "-r-xrwx---", userName, taskTrackerSpecialGroup);
-
-    // Now. make sure that all the path components also have proper
-    // permissions.
-    checkPermissionOnPathComponents(cachedFirstFile, userName);
-    checkPermissionOnPathComponents(cachedSecondFile, userName);
+    for (Path p : localCacheFiles) {
+      // First make sure that the cache file has proper permissions.
+      TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(),
+          "-r-xrwx---", userName, taskTrackerSpecialGroup);
+      // Now. make sure that all the path components also have proper
+      // permissions.
+      checkPermissionOnPathComponents(p.toUri().getPath(), userName);
+    }
+
   }
 
   /**
@@ -134,9 +137,9 @@
     String trailingStringForFirstFile =
         cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
             + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
-            + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName),
+            + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName),
             "");
-    LOG.info("Leading path for cacheFirstFile is : "
+    LOG.info("Trailing path for cacheFirstFile is : "
         + trailingStringForFirstFile);
     // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string.
     String leadingStringForFirstFile =

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jan 26 14:02:53 2010
@@ -25,9 +25,11 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +50,7 @@
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.StringUtils;
 
 import org.apache.commons.logging.Log;
 
@@ -272,7 +275,9 @@
     while (true) {
       boolean shouldWait = false;
       for (JobStatus jobStatuses : jobClient.getAllJobs()) {
-        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
+        if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+            && jobStatuses.getRunState() != JobStatus.FAILED
+            && jobStatuses.getRunState() != JobStatus.KILLED) {
           shouldWait = true;
           break;
         }
@@ -620,6 +625,7 @@
     conf.setJobName("test-job-fail");
     conf.setMapperClass(FailMapper.class);
     conf.setReducerClass(IdentityReducer.class);
+    conf.setMaxMapAttempts(1);
     
     RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
     while (!job.isComplete()) {
@@ -660,6 +666,37 @@
 
     return job;
   }
+  
+  /**
+   * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread
+   * asynchronously.
+   */
+  public static class InlineCleanupQueue extends CleanupQueue {
+    List<String> stalePaths = new ArrayList<String>();
+
+    public InlineCleanupQueue() {
+      // do nothing
+    }
+
+    @Override
+    public void addToQueue(PathDeletionContext... contexts) {
+      // delete paths in-line
+      for (PathDeletionContext context : contexts) {
+        try {
+          if (!deletePath(context)) {
+            LOG.warn("Stale path " + context.fullPath);
+            stalePaths.add(context.fullPath);
+          }
+        } catch (IOException e) {
+          LOG.warn("Caught exception while deleting path "
+              + context.fullPath);
+          LOG.info(StringUtils.stringifyException(e));
+          stalePaths.add(context.fullPath);
+        }
+      }
+    }
+  }
+  
   static class FakeClock extends Clock {
     long time = 0;
     
@@ -722,7 +759,7 @@
     conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     JobTracker jt;
     try {
-      jt = new JobTracker(conf);
+      jt = JobTracker.startTracker(conf);
       return jt;
     } catch (Exception e) {
       throw new RuntimeException("Could not start jt", e);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java Tue Jan 26 14:02:53 2010
@@ -150,13 +150,13 @@
     } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) {
       // specified IndirectInputFormat? Build src list
       JobClient jClient = new JobClient(conf);  
-      Path sysdir = jClient.getSystemDir();
+      Path tmpDir = new Path("/tmp");
       Random r = new Random();
-      Path indirInputFile = new Path(sysdir,
+      Path indirInputFile = new Path(tmpDir,
           Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
       conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString());
       SequenceFile.Writer writer = SequenceFile.createWriter(
-          sysdir.getFileSystem(conf), conf, indirInputFile,
+          tmpDir.getFileSystem(conf), conf, indirInputFile,
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java Tue Jan 26 14:02:53 2010
@@ -19,6 +19,7 @@
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
@@ -34,6 +35,8 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import org.junit.Test;
+
 public class TestMRJobClient extends ClusterMapReduceTestCase {
   
   private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
@@ -61,6 +64,7 @@
     }
   }
 
+  @Test
   public void testJobClient() throws Exception {
     Configuration conf = createJobConf();
     Job job = runJob(conf);
@@ -69,7 +73,8 @@
     testJobList(jobId, conf);
     testChangingJobPriority(jobId, conf);
   }
-  
+
+  @Test
   public void testGetCounter(String jobId,
       Configuration conf) throws Exception {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -81,6 +86,7 @@
     assertEquals("Counter", "3", out.toString().trim());
   }
 
+  @Test
   public void testJobList(String jobId,
       Configuration conf) throws Exception {
     verifyJobPriority(jobId, "HIGH", conf, createJobClient());
@@ -106,7 +112,8 @@
     }
     pis.close();
   }
-  
+
+  @Test
   public void testChangingJobPriority(String jobId, Configuration conf)
       throws Exception {
     int exitCode = runTool(conf, createJobClient(),
@@ -115,7 +122,56 @@
     assertEquals("Exit code", 0, exitCode);
     verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient());
   }
-  
+
+  @Test
+  public void testMissingProfileOutput() throws Exception {
+    Configuration conf = createJobConf();
+    final String input = "hello1\n";
+
+    // Set a job to be profiled with an empty agentlib parameter.
+    // This will fail to create profile.out files for tasks.
+    // This will succeed by skipping the HTTP fetch of the
+    // profiler output.
+    Job job = MapReduceTestUtil.createJob(conf,
+        getInputDir(), getOutputDir(), 1, 1, input);
+    job.setJobName("disable-profile-fetch");
+    job.setProfileEnabled(true);
+    job.setProfileParams("-agentlib:,verbose=n,file=%s");
+    job.setMaxMapAttempts(1);
+    job.setMaxReduceAttempts(1);
+    job.setJobSetupCleanupNeeded(false);
+    job.waitForCompletion(true);
+
+    // Run another job with an hprof agentlib param; verify
+    // that the HTTP fetch works here.
+    Job job2 = MapReduceTestUtil.createJob(conf,
+        getInputDir(), getOutputDir(), 1, 1, input);
+    job2.setJobName("enable-profile-fetch");
+    job2.setProfileEnabled(true);
+    job2.setProfileParams(
+        "-agentlib:hprof=cpu=samples,heap=sites,force=n,"
+        + "thread=y,verbose=n,file=%s");
+    job2.setProfileTaskRange(true, "0-1");
+    job2.setProfileTaskRange(false, "");
+    job2.setMaxMapAttempts(1);
+    job2.setMaxReduceAttempts(1);
+    job2.setJobSetupCleanupNeeded(false);
+    job2.waitForCompletion(true);
+
+    // Find the first map task, verify that we got its profile output file.
+    TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
+    assertTrue("No task reports found!", reports.length > 0);
+    TaskReport report = reports[0];
+    TaskID id = report.getTaskId();
+    assertTrue(TaskType.MAP == id.getTaskType());
+    System.out.println("Using task id: " + id);
+    TaskAttemptID attemptId = new TaskAttemptID(id, 0);
+
+    File profileOutFile = new File(attemptId.toString() + ".profile");
+    assertTrue("Couldn't find profiler output", profileOutFile.exists());
+    assertTrue("Couldn't remove profiler output", profileOutFile.delete());
+  }
+
   protected CLI createJobClient() throws IOException {
     return new CLI();
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Jan 26 14:02:53 2010
@@ -162,10 +162,16 @@
                                       "REDUCE_OUTPUT_RECORDS").getValue();
     long reduceGrps = ctrs.findCounter(COUNTER_GROUP,
                                        "REDUCE_INPUT_GROUPS").getValue();
+    long mergedMapOutputs = ctrs.findCounter(COUNTER_GROUP,
+                                            "MERGED_MAP_OUTPUTS").getValue();
+    long shuffledMaps = ctrs.findCounter(COUNTER_GROUP,
+                                         "SHUFFLED_MAPS").getValue();
     assertEquals("map out = combine in", mapOut, combineIn);
     assertEquals("combine out = reduce in", combineOut, reduceIn);
     assertTrue("combine in > combine out", combineIn > combineOut);
     assertEquals("reduce groups = reduce out", reduceGrps, reduceOut);
+    assertEquals("Mismatch in mergedMapOutputs", mergedMapOutputs, 2);
+    assertEquals("Mismatch in shuffledMaps", shuffledMaps, 2);
     String group = "Random Group";
     CounterGroup ctrGrp = ctrs.getGroup(group);
     assertEquals(0, ctrGrp.size());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Jan 26 14:02:53 2010
@@ -32,12 +32,12 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,10 +45,13 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.mortbay.log.Log;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
@@ -59,7 +62,6 @@
           .getAbsolutePath();
 
   protected File ROOT_MAPRED_LOCAL_DIR;
-  private static String TEST_CACHE_BASE_DIR = "cachebasedir";
   protected int numLocalDirs = 6;
 
   private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
@@ -70,10 +72,11 @@
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
-    new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+  protected TaskController taskController;
 
   @Override
-  protected void setUp() throws IOException {
+  protected void setUp() throws IOException,InterruptedException {
 
     // Prepare the tests' root dir
     File TEST_ROOT = new File(TEST_ROOT_DIR);
@@ -85,17 +88,36 @@
     ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
     ROOT_MAPRED_LOCAL_DIR.mkdirs();
 
+    String []localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i);
+      localDirs[i] = localDir.getPath();
+      localDir.mkdir();
+    }
+
     conf = new Configuration();
-    conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
-    conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString());
+    conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     fs = FileSystem.get(conf);
+    Class<? extends TaskController> taskControllerClass = conf.getClass(
+        TTConfig.TT_TASK_CONTROLLER, DefaultTaskController.class,
+        TaskController.class);
+    taskController = (TaskController) ReflectionUtils.newInstance(
+        taskControllerClass, conf);
+
+    // setup permissions for mapred local dir
+    taskController.setup();
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
-    createTempFile(firstCacheFile);
-    createTempFile(secondCacheFile);
+    createPrivateTempFile(firstCacheFile);
+    createPrivateTempFile(secondCacheFile);
+  }
+  
+  protected void refreshConf(Configuration conf) throws IOException {
+    taskController.setConf(conf);
+    taskController.setup();
   }
 
   /**
@@ -121,9 +143,12 @@
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(conf);
+    String userName = getJobOwnerName();
+    subConf.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
     Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
@@ -131,22 +156,16 @@
     subConf.writeXml(os);
     os.close();
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
-
-    InitializationContext context = new InitializationContext();
-    context.user = userName;
-    context.workDir = workDir;
-    getTaskController().initializeDistributedCache(context);
+        .getPrivateDistributedCacheDir(userName), 
+        TaskTracker.getPublicDistributedCacheDir());
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -176,28 +195,26 @@
       TrackerDistributedCacheManager {
     public FakeTrackerDistributedCacheManager(Configuration conf)
         throws IOException {
-      super(conf);
+      super(conf, taskController);
     }
 
     @Override
     Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive)
-        throws IOException {
+        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
+        boolean isPublic) throws IOException {
       if (cache.equals(firstCacheFile.toUri())) {
         throw new IOException("fake fail");
       }
       return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          fileStatus, isArchive);
+          fileStatus, isArchive, isPublic);
     }
   }
 
   public void testReferenceCount() throws IOException, LoginException,
-      URISyntaxException {
+      URISyntaxException, InterruptedException {
     if (!canRun()) {
       return;
     }
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     TrackerDistributedCacheManager manager = 
       new FakeTrackerDistributedCacheManager(conf);
     Cluster cluster = new Cluster(conf);
@@ -206,25 +223,29 @@
 
     // Configures a job with a regular file
     Job job1 = Job.getInstance(cluster, conf);
+    job1.setUser(userName);
     job1.addCacheFile(secondCacheFile.toUri());
     Configuration conf1 = job1.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf1);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
     // Task localizing for first job
     TaskDistributedCacheManager handle = manager
         .newTaskDistributedCacheManager(conf1);
     handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp));
     }
     
     Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
-    createTempFile(thirdCacheFile);
+    createPrivateTempFile(thirdCacheFile);
     
     // Configures another job with three regular files.
     Job job2 = Job.getInstance(cluster, conf);
+    job2.setUser(userName);
     // add a file that would get failed to localize
     job2.addCacheFile(firstCacheFile.toUri());
     // add a file that is already localized by different job
@@ -233,6 +254,7 @@
     job2.addCacheFile(thirdCacheFile.toUri());
     Configuration conf2 = job2.getConfiguration();
     TrackerDistributedCacheManager.determineTimestamps(conf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf2);
 
     // Task localizing for second job
     // localization for the "firstCacheFile" will fail.
@@ -240,7 +262,8 @@
     Throwable th = null;
     try {
       handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
     } catch (IOException e) {
       th = e;
       Log.info("Exception during setup", e);
@@ -261,7 +284,73 @@
     assertTrue(th.getMessage().contains(thirdCacheFile.getName()));
     fs.delete(thirdCacheFile, false);
   }
+  
+  /**
+   * Tests that localization of distributed cache file happens in the desired
+   * directory
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testPublicPrivateCache() 
+  throws IOException, LoginException, InterruptedException {
+    if (!canRun()) {
+      return;
+    }
+    checkLocalizedPath("true");
+    checkLocalizedPath("false");
+  }
+  
+  private void checkLocalizedPath(String visibility) 
+  throws IOException, LoginException, InterruptedException {
+    TrackerDistributedCacheManager manager = 
+      new TrackerDistributedCacheManager(conf, taskController);
+    Cluster cluster = new Cluster(conf);
+    String userName = getJobOwnerName();
+    File workDir = new File(TEST_ROOT_DIR, "workdir");
+    Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+    if ("true".equals(visibility)) {
+      createPublicTempFile(cacheFile);
+    } else {
+      createPrivateTempFile(cacheFile);
+    }
+    
+    Job job1 = Job.getInstance(cluster, conf);
+    job1.setUser(userName);
+    job1.addCacheFile(cacheFile.toUri());
+    Configuration conf1 = job1.getConfiguration();
+    TrackerDistributedCacheManager.determineTimestamps(conf1);
+    TrackerDistributedCacheManager.determineCacheVisibilities(conf1);
 
+    // Task localizing for job
+    TaskDistributedCacheManager handle = manager
+        .newTaskDistributedCacheManager(conf1);
+    handle.setup(localDirAllocator, workDir, TaskTracker
+          .getPrivateDistributedCacheDir(userName), 
+          TaskTracker.getPublicDistributedCacheDir());
+    TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
+    String distCacheDir;
+    if ("true".equals(visibility)) {
+      distCacheDir = TaskTracker.getPublicDistributedCacheDir(); 
+    } else {
+      distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName);
+    }
+    Path localizedPath =
+      manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
+          fs.getFileStatus(cacheFile), false,
+          c.timestamp, new Path(TEST_ROOT_DIR), false,
+          Boolean.parseBoolean(visibility));
+    assertTrue("Cache file didn't get localized in the expected directory. " +
+        "Expected localization to happen within " + 
+        ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
+        ", but was localized at " + 
+        localizedPath, localizedPath.toString().contains(distCacheDir));
+    if ("true".equals(visibility)) {
+      checkPublicFilePermissions(new Path[]{localizedPath});
+    } else {
+      checkFilePermissions(new Path[]{localizedPath});
+    }
+  }
+  
   /**
    * Check proper permissions on the cache files
    * 
@@ -270,17 +359,29 @@
    */
   protected void checkFilePermissions(Path[] localCacheFiles)
       throws IOException {
-    Path cachedFirstFile = localCacheFiles[0];
-    Path cachedSecondFile = localCacheFiles[1];
-    // Both the files should have executable permissions on them.
-    assertTrue("First cache file is not executable!", new File(cachedFirstFile
-        .toUri().getPath()).canExecute());
-    assertTrue("Second cache file is not executable!", new File(
-        cachedSecondFile.toUri().getPath()).canExecute());
+    // All the files should have executable permissions on them.
+    for (Path p : localCacheFiles) {
+      assertTrue("Cache file is not executable!", new File(p
+          .toUri().getPath()).canExecute());
+    }
   }
 
-  protected TaskController getTaskController() {
-    return new DefaultTaskController();
+  /**
+   * Check permissions on the public cache files
+   * 
+   * @param localCacheFiles
+   * @throws IOException
+   */
+  private void checkPublicFilePermissions(Path[] localCacheFiles)
+      throws IOException {
+    // All the files should have read and executable permissions for others
+    for (Path p : localCacheFiles) {
+      FsPermission perm = fs.getFileStatus(p).getPermission();
+      assertTrue("cache file is not readable by others", perm.getOtherAction()
+          .implies(FsAction.READ));
+      assertTrue("cache file is not executable by others", perm
+          .getOtherAction().implies(FsAction.EXECUTE));
+    }
   }
 
   protected String getJobOwnerName() throws LoginException {
@@ -293,27 +394,39 @@
     if (!canRun()) {
       return;
     }
+    // This test needs MRConfig.LOCAL_DIR to be single directory
+    // instead of four, because it assumes that both 
+    // firstcachefile and secondcachefile will be localized on same directory 
+    // so that second localization triggers deleteCache.
+    // If MRConfig.LOCAL_DIR is four directories, second localization might not 
+    // trigger deleteCache, if it is localized in different directory.
+    Configuration conf2 = new Configuration(conf);
+    conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
+    refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
-        new TrackerDistributedCacheManager(conf);
-    FileSystem localfs = FileSystem.getLocal(conf);
+        new TrackerDistributedCacheManager(conf2, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf2);
     long now = System.currentTimeMillis();
+    String userName = getJobOwnerName();
+    conf2.set(JobContext.USER_NAME, userName);
 
-    manager.getLocalCache(firstCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false);
-    manager.releaseCache(firstCacheFile.toUri(), conf, now);
+    Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    manager.releaseCache(firstCacheFile.toUri(), conf2, now);
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    manager.getLocalCache(secondCacheFile.toUri(), conf, 
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false);
-    FileStatus[] dirStatuses = localfs.listStatus(
-      new Path(ROOT_MAPRED_LOCAL_DIR.toString()));
-    assertTrue("DistributedCache failed deleting old" + 
+    manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(secondCacheFile), false, 
+        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+    assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
-        dirStatuses.length == 1);
+        localfs.exists(localCache));
   }
   
   public void testFileSystemOtherThanDefault() throws Exception {
@@ -321,14 +434,17 @@
       return;
     }
     TrackerDistributedCacheManager manager =
-      new TrackerDistributedCacheManager(conf);
+      new TrackerDistributedCacheManager(conf, taskController);
     conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    String userName = getJobOwnerName();
+    conf.set(JobContext.USER_NAME, userName);
     Path fileToCache = new Path("fakefile:///"
         + firstCacheFile.toUri().getPath());
     Path result = manager.getLocalCache(fileToCache.toUri(), conf,
-        TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
         System.currentTimeMillis(),
-        new Path(TEST_ROOT_DIR), false);
+        new Path(TEST_ROOT_DIR), false, false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -342,6 +458,18 @@
     os.close();
     FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
   }
+  
+  static void createPublicTempFile(Path p) 
+  throws IOException, InterruptedException {
+    createTempFile(p);
+    FileUtil.chmod(p.toString(), "0777",true);
+  }
+  
+  static void createPrivateTempFile(Path p) 
+  throws IOException, InterruptedException {
+    createTempFile(p);
+    FileUtil.chmod(p.toString(), "0770",true);
+  }
 
   @Override
   protected void tearDown() throws IOException {
@@ -382,26 +510,29 @@
       return;
     }
     Configuration myConf = new Configuration(conf);
-    myConf.set("fs.default.name", "refresh:///");
+    myConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "refresh:///");
     myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class);
+    String userName = getJobOwnerName();
+
     TrackerDistributedCacheManager manager = 
-      new TrackerDistributedCacheManager(myConf);
+      new TrackerDistributedCacheManager(myConf, taskController);
     // ****** Imitate JobClient code
     // Configures a task/job with both a regular file and a "classpath" file.
     Configuration subConf = new Configuration(myConf);
+    subConf.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     TrackerDistributedCacheManager.determineTimestamps(subConf);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf);
     // ****** End of imitating JobClient code
 
-    String userName = getJobOwnerName();
-
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
       manager.newTaskDistributedCacheManager(subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
+        .getPrivateDistributedCacheDir(userName), 
+        TaskTracker.getPublicDistributedCacheDir());
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -422,7 +553,7 @@
     Throwable th = null;
     try {
       handle.setup(localDirAllocator, workDir, TaskTracker
-          .getDistributedCacheDir(userName));
+          .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
     } catch (IOException ie) {
       th = ie;
     }
@@ -434,13 +565,15 @@
     
     // submit another job
     Configuration subConf2 = new Configuration(myConf);
+    subConf2.set(JobContext.USER_NAME, userName);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2);
     TrackerDistributedCacheManager.determineTimestamps(subConf2);
+    TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
     
     handle =
       manager.newTaskDistributedCacheManager(subConf2);
     handle.setup(localDirAllocator, workDir, TaskTracker
-        .getDistributedCacheDir(userName));
+        .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
     Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
     assertNotNull(null, localCacheFiles2);
     assertEquals(1, localCacheFiles2.length);
@@ -456,4 +589,46 @@
     handle.release();
   }
 
+  /**
+   * Localize a file. After localization is complete, create a file, "myFile",
+   * under the directory where the file is localized and ensure that it has
+   * permissions different from what is set by default. Then, localize another
+   * file. Verify that "myFile" has the right permissions.
+   * @throws Exception
+   */
+  public void testCustomPermissions() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    String userName = getJobOwnerName();
+    conf.set(JobContext.USER_NAME, userName);
+    TrackerDistributedCacheManager manager = 
+        new TrackerDistributedCacheManager(conf, taskController);
+    FileSystem localfs = FileSystem.getLocal(conf);
+    long now = System.currentTimeMillis();
+
+    Path[] localCache = new Path[2];
+    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        now, new Path(TEST_ROOT_DIR), false, false);
+    FsPermission myPermission = new FsPermission((short)0600);
+    Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
+    if (FileSystem.create(localfs, myFile, myPermission) == null) {
+      throw new IOException("Could not create " + myFile);
+    }
+    try {
+      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+          TaskTracker.getPrivateDistributedCacheDir(userName),
+          fs.getFileStatus(secondCacheFile), false, 
+          System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+      FileStatus stat = localfs.getFileStatus(myFile);
+      assertTrue(stat.getPermission().equals(myPermission));
+      // validate permissions of localized files.
+      checkFilePermissions(localCache);
+    } finally {
+      localfs.delete(myFile, false);
+    }
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java Tue Jan 26 14:02:53 2010
@@ -25,6 +25,7 @@
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.Vector;
 import java.util.regex.Matcher;
@@ -181,12 +182,12 @@
 
     LOG.info("Process-tree dump follows: \n" + processTreeDump);
     assertTrue("Process-tree dump doesn't start with a proper header",
-        processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME "
-            + "VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+        processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+        "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+        "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
     for (int i = N; i >= 0; i--) {
-      String cmdLineDump =
-          "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\) [0-9]+ [0-9]+ sh " + shellScript
-              + " " + i;
+      String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" +
+          " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i;
       Pattern pat = Pattern.compile(cmdLineDump);
       Matcher mat = pat.matcher(processTreeDump);
       assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i
@@ -267,6 +268,8 @@
     String session;
     String vmem = "0";
     String rssmemPage = "0";
+    String utime = "0";
+    String stime = "0";
     
     public ProcessStatInfo(String[] statEntries) {
       pid = statEntries[0];
@@ -278,27 +281,32 @@
       if (statEntries.length > 6) {
         rssmemPage = statEntries[6];
       }
+      if (statEntries.length > 7) {
+        utime = statEntries[7];
+        stime = statEntries[8];
+      }
     }
     
     // construct a line that mimics the procfs stat file.
     // all unused numerical entries are set to 0.
     public String getStatLine() {
       return String.format("%s (%s) S %s %s %s 0 0 0" +
-                      " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s %s 0 0" +
+                      " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" +
                       " 0 0 0 0 0 0 0 0" +
                       " 0 0 0 0 0", 
-                      pid, name, ppid, pgrpId, session, vmem, rssmemPage);
+                      pid, name, ppid, pgrpId, session,
+                      utime, stime, vmem, rssmemPage);
     }
   }
   
   /**
    * A basic test that creates a few process directories and writes
-   * stat files. Verifies that the virtual and rss memory is correctly
+   * stat files. Verifies that the cpu time and memory is correctly
    * computed.
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  public void testMemoryForProcessTree() throws IOException {
+  public void testCpuAndMemoryForProcessTree() throws IOException {
 
     // test processes
     String[] pids = { "100", "200", "300", "400" };
@@ -313,13 +321,13 @@
       // assuming processes 100, 200, 300 are in tree and 400 is not.
       ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
       procInfos[0] = new ProcessStatInfo(new String[] 
-                        {"100", "proc1", "1", "100", "100", "100000", "100"});
+          {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
       procInfos[1] = new ProcessStatInfo(new String[] 
-                        {"200", "proc2", "100", "100", "100", "200000", "200"});
+          {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
       procInfos[2] = new ProcessStatInfo(new String[] 
-                        {"300", "proc3", "200", "100", "100", "300000", "300"});
+          {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
       procInfos[3] = new ProcessStatInfo(new String[] 
-                        {"400", "proc4", "1", "400", "400", "400000", "400"});
+          {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"});
       
       writeStatFiles(procfsRootDir, pids, procInfos);
       
@@ -339,6 +347,28 @@
                         600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
       assertEquals("Cumulative rss memory does not match",
                    cumuRssMem, processTree.getCumulativeRssmem());
+
+      // verify cumulative cpu time
+      long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
+
+      // test the cpu time again to see if it cumulates
+      procInfos[0] = new ProcessStatInfo(new String[]
+          {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"});
+      procInfos[1] = new ProcessStatInfo(new String[]
+          {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"});
+      writeStatFiles(procfsRootDir, pids, procInfos);
+
+      // build the process tree.
+      processTree.getProcessTree();
+
+      // verify cumulative cpu time again
+      cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -498,17 +528,17 @@
       // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not.
       ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses];
       procInfos[0] = new ProcessStatInfo(new String[] {
-          "100", "proc1", "1", "100", "100", "100000", "100"});
+          "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
       procInfos[1] = new ProcessStatInfo(new String[] {
-          "200", "proc2", "100", "100", "100", "200000", "200"});
+          "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
       procInfos[2] = new ProcessStatInfo(new String[] {
-          "300", "proc3", "200", "100", "100", "300000", "300"});
+          "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
       procInfos[3] = new ProcessStatInfo(new String[] {
-          "400", "proc4", "200", "100", "100", "400000", "400"});
+          "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"});
       procInfos[4] = new ProcessStatInfo(new String[] {
-          "500", "proc5", "400", "100", "100", "400000", "400"});
+          "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"});
       procInfos[5] = new ProcessStatInfo(new String[] {
-          "600", "proc6", "1", "1", "1", "400000", "400"});
+          "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"});
 
       String[] cmdLines = new String[numProcesses];
       cmdLines[0] = "proc1 arg1 arg2";
@@ -532,15 +562,17 @@
 
       LOG.info("Process-tree dump follows: \n" + processTreeDump);
       assertTrue("Process-tree dump doesn't start with a proper header",
-          processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME "
-              + "VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+          processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+          "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+          "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
       for (int i = 0; i < 5; i++) {
         ProcessStatInfo p = procInfos[i];
         assertTrue(
             "Process-tree dump doesn't contain the cmdLineDump of process "
                 + p.pid, processTreeDump.contains("\t|- " + p.pid + " "
                 + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name
-                + ") " + p.vmem + " " + p.rssmemPage + " " + cmdLines[i]));
+                + ") " + p.utime + " " + p.stime + " " + p.vmem + " "
+                + p.rssmemPage + " " + cmdLines[i]));
       }
 
       // 600 should not be in the dump
@@ -549,7 +581,7 @@
           "Process-tree dump shouldn't contain the cmdLineDump of process "
               + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid
               + " " + p.pgrpId + " " + p.session + " (" + p.name + ") "
-              + p.vmem + " " + p.rssmemPage + " " + cmdLines[5]));
+              + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5]));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Tue Jan 26 14:02:53 2010
@@ -58,7 +58,10 @@
       JobConf mrConf = new JobConf(conf);
       mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, 
                              null, null, mrConf);
-
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+      
       // Run examples
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf));
       TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf));

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:713112
 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934,884917-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934,884917-903221

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java Tue Jan 26 14:02:53 2010
@@ -995,6 +995,41 @@
     }
   }
 
+  /**
+   * verify that -delete option works for other {@link FileSystem}
+   * implementations. See MAPREDUCE-1285 */
+  public void testDeleteLocal() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
+      cluster = new MiniDFSCluster(conf, 1, true, null);
+      final FileSystem hdfs = cluster.getFileSystem();
+      final String namenode = FileSystem.getDefaultUri(conf).toString();
+      if (namenode.startsWith("hdfs://")) {
+        MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
+        String destdir = TEST_ROOT_DIR + "/destdat";
+        MyFile[] localFiles = createFiles(localfs, destdir);
+        ToolRunner.run(new DistCp(conf), new String[] {
+                                         "-delete",
+                                         "-update",
+                                         "-log",
+                                         "/logs",
+                                         namenode+"/srcdat",
+                                         "file:///"+TEST_ROOT_DIR+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+                   checkFiles(localfs, destdir, files));
+        assertTrue("Log directory does not exist.",
+                    hdfs.exists(new Path("/logs")));
+        deldir(localfs, destdir);
+        deldir(hdfs, "/logs");
+        deldir(hdfs, "/srcdat");
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
   /** test globbing  */
   public void testGlobbing() throws Exception {
     String namenode = null;
@@ -1057,4 +1092,4 @@
     }
     return results.toString();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java Tue Jan 26 14:02:53 2010
@@ -16,109 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.util;
-
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.HashMap;
+package org.apache.hadoop.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
-
-import junit.framework.TestCase;
-
-public class TestReflectionUtils extends TestCase {
-
-  private static Class toConstruct[] = { String.class, TestReflectionUtils.class, HashMap.class };
-  private Throwable failure = null;
-
-  public void setUp() {
-    ReflectionUtils.clearCache();
-  }
-    
-  public void testCache() throws Exception {
-    assertEquals(0, cacheSize());
-    doTestCache();
-    assertEquals(toConstruct.length, cacheSize());
-    ReflectionUtils.clearCache();
-    assertEquals(0, cacheSize());
-  }
-    
-    
-  @SuppressWarnings("unchecked")
-  private void doTestCache() {
-    for (int i=0; i<toConstruct.length; i++) {
-      Class cl = toConstruct[i];
-      Object x = ReflectionUtils.newInstance(cl, null);
-      Object y = ReflectionUtils.newInstance(cl, null);
-      assertEquals(cl, x.getClass());
-      assertEquals(cl, y.getClass());
-    }
-  }
-    
-  public void testThreadSafe() throws Exception {
-    Thread[] th = new Thread[32];
-    for (int i=0; i<th.length; i++) {
-      th[i] = new Thread() {
-          public void run() {
-            try {
-              doTestCache();
-            } catch (Throwable t) {
-              failure = t;
-            }
-          }
-        };
-      th[i].start();
-    }
-    for (int i=0; i<th.length; i++) {
-      th[i].join();
-    }
-    if (failure != null) {
-      failure.printStackTrace();
-      fail(failure.getMessage());
-    }
-  }
-    
-  private int cacheSize() throws Exception {
-    return ReflectionUtils.getCacheSize();
-  }
-    
-  public void testCantCreate() {
-    try {
-      ReflectionUtils.newInstance(NoDefaultCtor.class, null);
-      fail("invalid call should fail");
-    } catch (RuntimeException rte) {
-      assertEquals(NoSuchMethodException.class, rte.getCause().getClass());
-    }
-  }
-    
-  @SuppressWarnings("unchecked")
-  public void testCacheDoesntLeak() throws Exception {
-    int iterations=9999; // very fast, but a bit less reliable - bigger numbers force GC
-    for (int i=0; i<iterations; i++) {
-      URLClassLoader loader = new URLClassLoader(new URL[0], getClass().getClassLoader());
-      Class cl = Class.forName("org.apache.hadoop.util.TestReflectionUtils$LoadedInChild", false, loader);
-      Object o = ReflectionUtils.newInstance(cl, null);
-      assertEquals(cl, o.getClass());
-    }
-    System.gc();
-    assertTrue(cacheSize()+" too big", cacheSize()<iterations);
-  }
-    
-  private static class LoadedInChild {
-  }
-    
-  public static class NoDefaultCtor {
-    public NoDefaultCtor(int x) {}
+
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the JobConf-related parts of common's ReflectionUtils
+ * class.
+ */
+public class TestReflectionUtils {
+  @Before
+  public void setUp() {
+    ReflectionUtils.clearCache();
   }
-  
+
   /**
    * This is to test backward compatibility of ReflectionUtils for 
    * JobConfigurable objects. 
    * This should be made deprecated along with the mapred package HADOOP-1230. 
    * Should be removed when mapred package is removed.
    */
+  @Test
   public void testSetConf() {
     JobConfigurableOb ob = new JobConfigurableOb();
     ReflectionUtils.setConf(ob, new Configuration());
@@ -132,5 +56,5 @@
     public void configure(JobConf job) {
       configured = true;
     }
-  }
-}
+  }
+}

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCh.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCh.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCh.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Stack;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +47,10 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -420,10 +426,21 @@
     return Math.max(numMaps, 1);
   }
 
-  private boolean setup(List<FileOperation> ops, Path log) throws IOException {
+  private boolean setup(List<FileOperation> ops, Path log) 
+  throws IOException {
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobconf);
-    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    Path stagingArea;
+    try {
+      stagingArea = JobSubmissionFiles.getStagingDir(
+                       jClient.getClusterHandle(), jobconf);
+    } catch (InterruptedException ie){
+      throw new IOException(ie);
+    }
+    Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
     LOG.info(JOB_DIR_LABEL + "=" + jobdir);
 
     if (log == null) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java Tue Jan 26 14:02:53 2010
@@ -34,6 +34,8 @@
 import java.util.Stack;
 import java.util.StringTokenizer;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -66,7 +68,10 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -1196,9 +1201,22 @@
 
     final String randomId = getRandomId();
     JobClient jClient = new JobClient(jobConf);
-    Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    Path stagingArea;
+    try {
+      stagingArea = 
+        JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    
+    Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms);
     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
 
+    long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP);
+
     FileSystem dstfs = args.dst.getFileSystem(conf);
     boolean dstExists = dstfs.exists(args.dst);
     boolean dstIsDir = false;
@@ -1365,7 +1383,7 @@
 
                 ++cnsyncf;
                 cbsyncs += child.getLen();
-                if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+                if (cnsyncf > SYNC_FILE_MAX || cbsyncs > maxBytesPerMap) {
                   src_writer.sync();
                   dst_writer.sync();
                   cnsyncf = 0;
@@ -1540,7 +1558,7 @@
     //write dst lsr results
     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
     final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
-        dstlsr, Text.class, FileStatus.class,
+        dstlsr, Text.class, dstroot.getClass(),
         SequenceFile.CompressionType.NONE);
     try {
       //do lsr to get all file statuses in dstroot

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 # ResourceBundle properties file for distcp counters
 
 CounterGroupName=       distcp

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java Tue Jan 26 14:02:53 2010
@@ -29,6 +29,8 @@
 import java.util.Set;
 import java.util.TreeMap;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HarFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +59,12 @@
 import org.apache.hadoop.mapred.SequenceFileRecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -359,12 +367,22 @@
     }
     conf.set(DST_DIR_LABEL, outputPath.toString());
     final String randomId = DistCp.getRandomId();
-    Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
+    Path stagingArea;
+    try {
+      stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf), 
+          conf);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    Path jobDirectory = new Path(stagingArea,
                           NAME + "_" + randomId);
+    FsPermission mapredSysPerms = 
+      new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory,
+                      mapredSysPerms);
     conf.set(JOB_DIR_LABEL, jobDirectory.toString());
     //get a tmp directory for input splits
     FileSystem jobfs = jobDirectory.getFileSystem(conf);
-    jobfs.mkdirs(jobDirectory);
     Path srcFiles = new Path(jobDirectory, "_har_src_files");
     conf.set(SRC_LIST_LABEL, srcFiles.toString());
     SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Tue Jan 26 14:02:53 2010
@@ -1285,7 +1285,7 @@
           attempt.setLocation(host.makeLoggedLocation());
         }
 
-        ArrayList<LoggedLocation> locs = task.getPreferredLocations();
+        List<LoggedLocation> locs = task.getPreferredLocations();
 
         if (host != null && locs != null) {
           for (LoggedLocation loc : locs) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Tue Jan 26 14:02:53 2010
@@ -36,7 +36,9 @@
 
 /**
  * A simple wrapper for parsing JSON-encoded data using ObjectMapper.
- * @param <T> The (base) type of the object(s) to be parsed by this parser.
+ * 
+ * @param <T>
+ *          The (base) type of the object(s) to be parsed by this parser.
  */
 class JsonObjectMapperParser<T> implements Closeable {
   private final ObjectMapper mapper;
@@ -47,7 +49,7 @@
   /**
    * Constructor.
    * 
-   * @param path 
+   * @param path
    *          Path to the JSON data file, possibly compressed.
    * @param conf
    * @throws IOException

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Tue Jan 26 14:02:53 2010
@@ -102,6 +102,24 @@
     setJobID(jobID);
   }
 
+  void adjustTimes(long adjustment) {
+    submitTime += adjustment;
+    launchTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTask task : mapTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : reduceTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : otherTasks) {
+      task.adjustTimes(adjustment);
+    }
+  }
+
   @SuppressWarnings("unused")
   // for input parameter ignored.
   @JsonAnySetter

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Tue Jan 26 14:02:53 2010
@@ -18,7 +18,10 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -41,12 +44,15 @@
  * 
  */
 public class LoggedLocation implements DeepCompare {
+   static final Map<List<String>, List<String>> layersCache = 
+    new HashMap<List<String>, List<String>>();
+
   /**
    * The full path from the root of the network to the host.
    * 
    * NOTE that this assumes that the network topology is a tree.
    */
-  List<String> layers = new ArrayList<String>();
+  List<String> layers = Collections.emptyList();
 
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
@@ -56,7 +62,26 @@
   }
 
   void setLayers(List<String> layers) {
-    this.layers = layers;
+    if (layers == null || layers.isEmpty()) {
+      this.layers = Collections.emptyList();
+    } else {
+      synchronized (layersCache) {
+        List<String> found = layersCache.get(layers);
+        if (found == null) {
+          // make a copy with interned string.
+          List<String> clone = new ArrayList<String>(layers.size());
+          for (String s : layers) {
+            clone.add(s.intern());
+          }
+          // making it read-only as we are sharing them.
+          List<String> readonlyLayers = Collections.unmodifiableList(clone);
+          layersCache.put(readonlyLayers, readonlyLayers);
+          this.layers = readonlyLayers;
+        } else {
+          this.layers = found;
+        }
+      }
+    }
   }
 
   @SuppressWarnings("unused")

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -44,9 +45,7 @@
   Pre21JobHistoryConstants.Values taskType;
   Pre21JobHistoryConstants.Values taskStatus;
   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
-
-  ArrayList<LoggedLocation> preferredLocations =
-      new ArrayList<LoggedLocation>();
+  List<LoggedLocation> preferredLocations = Collections.emptyList();
 
   int numberMaps = -1;
   int numberReduces = -1;
@@ -69,6 +68,15 @@
     super();
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTaskAttempt attempt : attempts) {
+      attempt.adjustTimes(adjustment);
+    }
+  }
+
   public long getInputBytes() {
     return inputBytes;
   }
@@ -130,15 +138,23 @@
   }
 
   void setAttempts(List<LoggedTaskAttempt> attempts) {
-    this.attempts = attempts;
+    if (attempts == null) {
+      this.attempts = new ArrayList<LoggedTaskAttempt>();
+    } else {
+      this.attempts = attempts;
+    }
   }
 
-  public ArrayList<LoggedLocation> getPreferredLocations() {
+  public List<LoggedLocation> getPreferredLocations() {
     return preferredLocations;
   }
 
-  void setPreferredLocations(ArrayList<LoggedLocation> preferredLocations) {
-    this.preferredLocations = preferredLocations;
+  void setPreferredLocations(List<LoggedLocation> preferredLocations) {
+    if (preferredLocations == null || preferredLocations.isEmpty()) {
+      this.preferredLocations = Collections.emptyList();
+    } else {
+      this.preferredLocations = preferredLocations;
+    }
   }
 
   public int getNumberMaps() {
@@ -204,8 +220,8 @@
     }
   }
 
-  private void compareLoggedLocations(ArrayList<LoggedLocation> c1,
-      ArrayList<LoggedLocation> c2, TreePath loc, String eltname)
+  private void compareLoggedLocations(List<LoggedLocation> c1,
+      List<LoggedLocation> c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 == null && c2 == null) {
       return;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Tue Jan 26 14:02:53 2010
@@ -82,6 +82,11 @@
     }
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+  }
+
   public long getShuffleFinished() {
     return shuffleFinished;
   }
@@ -135,7 +140,7 @@
   }
 
   void setHostName(String hostName) {
-    this.hostName = hostName;
+    this.hostName = hostName.intern();
   }
 
   public long getHdfsBytesRead() {



Mime
View raw message