hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [14/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Tue Jan 26 14:02:53 2010
@@ -20,28 +20,46 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster.TaskTrackerRunner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
 /**
  * Test if the job retire works fine. 
  */
 public class TestJobRetire extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestJobRetire.class);
   static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-expiry-testing");
 
+  private MiniMRCluster startCluster(JobConf conf, int numTrackers) 
+  throws IOException {
+    conf.setBoolean(JTConfig.JT_RETIREJOBS, true);
+    conf.setLong(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1);
+    return new MiniMRCluster(0, 0, numTrackers, "file:///", 1, null, null, null,
+                             conf, 0);
+  }
+  
   public void testJobRetire() throws Exception {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
-
-      conf.setBoolean(JTConfig.JT_RETIREJOBS, true);
-      conf.setLong(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
+      mr = startCluster(conf, 1);
+      
       JobConf jobConf = mr.createJobConf();
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
       
@@ -59,6 +77,7 @@
           1, jobtracker.getAllJobs().length);
     } finally {
       if (mr != null) { mr.shutdown();}
+      FileUtil.fullyDelete(new File(testDir.toString()));
     }
   }
 
@@ -70,13 +89,9 @@
     assertTrue(rj.isSuccessful());
     JobID id = rj.getID();
 
-    JobInProgress job = jobtracker.getJob(id);
     //wait for job to get retired
-    for (int i = 0; i < 10 && job != null; i++) {
-      UtilsForTests.waitFor(1000);
-      job = jobtracker.getJob(id);
-    }
-    assertNull("Job did not retire", job);
+    waitTillRetire(id, jobtracker);
+    
     assertTrue("History url not set", rj.getHistoryUrl() != null && 
     rj.getHistoryUrl().length() > 0);
     assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
@@ -86,7 +101,262 @@
     File file = new File(name);
  
     assertFalse("JobConf file not deleted", file.exists());
+    
+    //test redirection
+    URL jobUrl = new URL(rj.getTrackingURL());
+    HttpURLConnection conn = (HttpURLConnection) jobUrl.openConnection();
+    conn.setInstanceFollowRedirects(false);
+    conn.connect();
+    assertEquals(HttpURLConnection.HTTP_MOVED_TEMP, conn.getResponseCode());
+    conn.disconnect();
+    
+    URL redirectedUrl = new URL(conn.getHeaderField("Location"));
+    conn = (HttpURLConnection) redirectedUrl.openConnection();
+    conn.connect();
+    assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+    conn.disconnect();
     return id;
   }
 
+  // wait till the job retires
+  private void waitTillRetire(JobID id, JobTracker jobtracker) {
+    JobInProgress job = jobtracker.getJob(id);
+    //wait for job to get retired
+    for (int i = 0; i < 10 && job != null; i++) {
+      UtilsForTests.waitFor(1000);
+      job = jobtracker.getJob(id);
+    }
+    assertNull("Job did not retire", job);
+  }
+  
+  /**
+   * Custom TaskTracker which waits forever after a successful contact to 
+   * the JobTracker.
+   */
+  class WaitingTaskTracker extends TaskTracker {
+    
+    WaitingTaskTracker(JobConf conf) throws IOException {
+      super(conf);
+    }
+    
+    @Override
+    HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+      HeartbeatResponse response = super.transmitHeartBeat(now);
+      LOG.info("WaitingTaskTracker waiting");
+      // wait forever
+      UtilsForTests.waitFor(Long.MAX_VALUE);
+      throw new IOException ("WaitingTaskTracker interrupted. Bailing out");
+    }
+  }
+  
+  /**
+   * Test job retire with tasks that report their *first* status only after the
+   * job retires.
+   * Steps :
+   *  - Start a mini-mr cluster with 1 task-tracker having only map slots.
+   *    Note that this task-tracker will take care of setup/cleanup and map 
+   *    tasks.
+   *  - Submit a job with 1 map task and 1 reduce task
+   *  - Wait for the job to finish the map task
+   *  - Start a 2nd tracker that waits for a long time after contacting the JT.
+   *  - Wait for the 2nd tracker to get stuck
+   *  - Kill the job
+   *  - Wait for the job to retire
+   *  - Check if the tip mappings are cleaned up. 
+   */
+  public void testJobRetireWithUnreportedTasks() throws Exception {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+      conf.setInt(TTConfig.TT_REDUCE_SLOTS, 0);     
+      mr = startCluster(conf, 1);
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      
+      // submit a job
+      Path inDir = new Path(testDir, "in-1");
+      Path outDir = new Path(testDir, "out-1");
+      JobConf jConf = mr.createJobConf();
+      FileInputFormat.setInputPaths(jConf, new Path[] {inDir});
+      FileOutputFormat.setOutputPath(jConf, outDir);
+      SleepJob sleepJob = new SleepJob();
+      sleepJob.setConf(jConf);
+      Job job = sleepJob.createJob(1, 1, 0, 1, 0, 1);
+
+      job.submit();
+      JobID id = JobID.downgrade(job.getStatus().getJobID());
+      JobInProgress jip = jobtracker.getJob(id);
+      
+      // wait 100 secs for the map to complete
+      for (int i = 0; i < 100 && (jip.finishedMaps() < 1); i++) {
+        UtilsForTests.waitFor(1000);
+      }
+      assertEquals(jip.finishedMaps(), 1);
+      
+      // start a tracker that will wait
+      LOG.info("Adding a waiting tracker");
+      TaskTrackerRunner testTrackerRunner = 
+        mr.new TaskTrackerRunner(1, 1, null, mr.createJobConf()) {
+        @Override
+        TaskTracker createTaskTracker(JobConf conf) throws IOException {
+          return new WaitingTaskTracker(conf);
+        }
+      };
+      mr.addTaskTracker(testTrackerRunner);
+      LOG.info("Waiting tracker added");
+      
+      WaitingTaskTracker testTT = 
+        (WaitingTaskTracker)testTrackerRunner.getTaskTracker();
+      
+      // wait 100 secs for the newly started task-tracker to join
+      for (int i = 0; i < 1000 && (jobtracker.taskTrackers().size() < 2); i++) {
+        UtilsForTests.waitFor(100);
+      }
+      assertEquals(jobtracker.taskTrackers().size(), 2);
+      LOG.info("Cluster is now ready");
+      
+      // stop the test-tt as its no longer required
+      mr.stopTaskTracker(mr.getTaskTrackerID(testTT.getName()));
+      
+      // check if a reduce task got scheduled or not
+      assertEquals("Waiting tracker joined but no reduce task got scheduled", 
+                   1, jip.runningReduces());
+      
+      // kill the job
+      job.killJob();
+      
+      // check if the reduce task attempt status is missing
+      TaskInProgress tip = jip.getTasks(TaskType.REDUCE)[0]; 
+      assertNull(tip.getTaskStatus(tip.getAllTaskAttemptIDs()[0]));
+      
+      // wait for the job to retire
+      waitTillRetire(id, jobtracker);
+      
+      // check the taskidToTIPMap
+      for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+        LOG.info("TaskidToTIP mapping left over : " + tid);
+      }
+      assertEquals("'taskid' to TIP mapping still exists", 
+                   0, jobtracker.taskidToTIPMap.size());
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      FileUtil.fullyDelete(new File(testDir.toString()));
+    }
+  }
+  
+  /**
+   * (Mock)Test JobTracker.removeJobTasks() which is called only when the job 
+   * retires.
+   */
+  public void testJobRemoval() throws Exception {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      mr = startCluster(conf, 0);
+      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+      
+      // test map task removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.MAP);
+      // test reduce task removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.REDUCE);
+      // test job setup removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.JOB_SETUP);
+      // test job cleanup removal
+      testRemoveJobTasks(jobtracker, conf, TaskType.JOB_CLEANUP);
+    } finally {
+      if (mr != null) { mr.shutdown();}
+      // cleanup
+      FileUtil.fullyDelete(new File(testDir.toString()));
+    }
+  }
+ 
+  // create a new job and add it to the jobtracker
+  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) {
+    // submit a job in a fake manner
+    // get the new job-id
+    JobID id = 
+      new JobID(jobtracker.getTrackerIdentifier(), jobtracker.jobs.size() + 1);
+    // create a JobInProgress for this fake job
+    JobInProgress jip = new JobInProgress(id, conf, jobtracker);
+    
+    // insert this fake completed job in the jobtracker
+    jobtracker.jobs.put(id, jip);
+    
+    return jip;
+  }
+  
+  // create a new TaskInProgress and make it running by adding it to jobtracker
+  private TaskInProgress createAndAddTIP(JobTracker jobtracker, 
+                                         JobInProgress jip, TaskType type) {
+    JobConf conf = jip.getJobConf();
+    JobID id = jip.getJobID();
+    // now create a fake tip for this fake job
+    TaskInProgress tip = null;
+    if (type == TaskType.MAP) {
+      tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                               jobtracker, conf, jip, 0, 1);
+      jip.maps = new TaskInProgress[] {tip};
+    } else if (type == TaskType.REDUCE) {
+      tip = new TaskInProgress(id, "dummy", jip.desiredMaps(), 0, 
+                               jobtracker, conf, jip, 1);
+      jip.reduces = new TaskInProgress[] {tip};
+    } else if (type == TaskType.JOB_SETUP) {
+      tip = 
+        new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                           jobtracker, conf, jip, 0, 1);
+      jip.setup = new TaskInProgress[] {tip};
+    } else if (type == TaskType.JOB_CLEANUP) {
+      tip = 
+        new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT, 
+                           jobtracker, conf, jip, 0, 1);
+      jip.cleanup = new TaskInProgress[] {tip};
+    }
+    return tip;
+  }
+  
+  // create a new Task for the given tip and make it running
+  private TaskAttemptID createAndAddAttempt(TaskInProgress tip, int attemptId) {
+    // create a fake attempt for this fake task
+    TaskAttemptID taskid = new TaskAttemptID(tip.getTIPId(), attemptId);
+    
+    // insert this fake task into the jobtracker by making it running
+    tip.addRunningTask(taskid, "test-tt");
+    
+    return taskid;
+  }
+  
+  // Mock a job run such that the jobtracker is in a state similar to that 
+  // resulting from an actual job run.
+  // Steps :
+  //   - generate a new job-id
+  //   - create and add a JobInProgress object using the fake job-id
+  //   - create and add a fake tip of the passed type 't' under the fake job
+  //     Note that t can be a MAP or a REDUCE or a JOB_SETUP or a JOB_CLEANUP.
+  //   - create and add a fake attempt under the fake tip
+  //   - remove the job from the jobtracker
+  //   - check if the fake attempt is removed from the jobtracker
+  private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, 
+                                  TaskType type) {
+    // create and submit a job
+    JobInProgress jip = createAndAddJob(jobtracker, conf);
+    // create and add a tip
+    TaskInProgress tip = createAndAddTIP(jobtracker, jip, type);
+    // create and add an attempt
+    TaskAttemptID taskid = createAndAddAttempt(tip, 0);
+    
+    // this fake attempt should not have any status
+    assertNull(tip.getTaskStatus(taskid));
+    
+    // remove the job tasks for this fake job from the jobtracker
+    jobtracker.removeJobTasks(jip);
+    
+    // check the taskidToTIPMap
+    for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+      LOG.info("TaskidToTIP : " + tid);
+    }
+    
+    // check if the fake attempt is removed from the jobtracker
+    assertEquals("'taskid' to TIP mapping still exists", 
+                 0, jobtracker.taskidToTIPMap.size());
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Tue Jan 26 14:02:53 2010
@@ -56,7 +56,8 @@
                                            Path outDir,
                                            String input,
                                            int numMaps,
-                                           int numReduces) throws IOException {
+                                           int numReduces,
+                                           String sysDir) throws IOException {
     FileSystem inFs = inDir.getFileSystem(conf);
     FileSystem outFs = outDir.getFileSystem(conf);
     outFs.delete(outDir, true);
@@ -90,14 +91,13 @@
     assertFalse(FileSystem.get(conf).exists(
       new Path(conf.get(JTConfig.JT_SYSTEM_DIR)))); 
     // Check if the Job Tracker system dir is propogated to client
-    String sysDir = jobClient.getSystemDir().toString();
-    System.out.println("Job sys dir -->" + sysDir);
     assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
     assertTrue(sysDir.contains("custom"));
     return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
   }
 
- static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir) 
+ throws IOException {
     LOG.info("runWordCount");
     // Run a word count example
     // Keeping tasks that match this pattern
@@ -107,7 +107,7 @@
     result = launchWordCount(jobConf, inDir, outDir,
                              "The quick brown fox\nhas many silly\n" + 
                              "red fox sox\n",
-                             3, 1);
+                             3, 1, sysDir);
     assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                  "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
     // Checking if the Job ran successfully in spite of different system dir config
@@ -128,7 +128,7 @@
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
 
-      runWordCount(mr, mr.createJobConf());
+      runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Tue Jan 26 14:02:53 2010
@@ -186,8 +186,13 @@
     job1.finishTask(taskAttemptID[2]);
     jobTracker.finalizeJob(job1);
 
+    assertEquals("Mismatch in number of failed map tasks",
+        1, mi.numMapTasksFailed);
+    assertEquals("Mismatch in number of failed reduce tasks",
+        1, mi.numReduceTasksFailed);
+    
     assertEquals("Mismatch in number of blacklisted trackers",
-        mi.numTrackersBlackListed, 1);
+        1, mi.numTrackersBlackListed);
 
     assertEquals("Mismatch in blacklisted map slots", 
         mi.numBlackListedMapSlots, 
@@ -321,6 +326,41 @@
         trackers.length - 1, mi.numTrackers);
   }
   
+  public void testKillTasks() throws IOException {
+    int numMaps, numReds;
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setMaxTaskFailuresPerTracker(1);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[2];
+
+    numMaps = 1;
+    numReds = 1;
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
+
+    assertEquals("Mismatch in number of killed map tasks",
+        0, mi.numMapTasksKilled);
+    assertEquals("Mismatch in number of killed reduce tasks",
+        0, mi.numReduceTasksKilled);
+    
+    FakeJobInProgress job1 = new FakeJobInProgress(conf, jobTracker);
+    job1.setClusterSize(trackers.length);
+    job1.initTasks();
+    jobTracker.addJob(job1.getJobID(), job1);
+    taskAttemptID[0] = job1.findMapTask(trackers[0]);
+    job1.killTask(taskAttemptID[0]);
+    taskAttemptID[1] = job1.findReduceTask(trackers[0]);
+    job1.killTask(taskAttemptID[1]);
+    jobTracker.finalizeJob(job1);
+
+    assertEquals("Mismatch in number of killed map tasks",
+        1, mi.numMapTasksKilled);
+    assertEquals("Mismatch in number of killed reduce tasks",
+        1, mi.numReduceTasksKilled);
+  }
+  
   static class FakeTaskScheduler extends JobQueueTaskScheduler {
     public FakeTaskScheduler() {
       super();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java Tue Jan 26 14:02:53 2010
@@ -52,7 +52,7 @@
   @SuppressWarnings("deprecation")
   public void testKillCompletedJob() throws IOException, InterruptedException {
     job = new MyFakeJobInProgress(new JobConf(), jobTracker);
-    jobTracker.addJob(job.getJobID(), job);
+    jobTracker.addJob(job.getJobID(), (JobInProgress)job);
     job.status.setRunState(JobStatus.SUCCEEDED);
 
     jobTracker.killJob(job.getJobID());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Tue Jan 26 14:02:53 2010
@@ -45,10 +45,15 @@
   private static String taskTrackerSpecialGroup;
 
   @Override
+  protected boolean canRun() {
+    return ClusterWithLinuxTaskController.shouldRun();
+  }
+
+  @Override
   protected void setUp()
       throws Exception {
 
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
 
@@ -66,7 +71,8 @@
     taskController.setConf(trackerFConf);
     taskController.setup();
 
-    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+    tracker.setTaskController(taskController);
+    tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
         taskController));
 
     // Rewrite conf so as to reflect task's correct user name.
@@ -81,7 +87,7 @@
   @Override
   protected void tearDown()
       throws Exception {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
+    if (!canRun()) {
       return;
     }
     super.tearDown();
@@ -96,21 +102,6 @@
     // Do nothing.
   }
 
-  /**
-   * Test the localization of a user on the TT when {@link LinuxTaskController}
-   * is in use.
-   */
-  @Override
-  public void testUserLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkUserLocalization()
       throws IOException {
@@ -140,7 +131,7 @@
       // Verify the distributed cache dir.
       File distributedCacheDir =
           new File(localDir, TaskTracker
-              .getDistributedCacheDir(task.getUser()));
+              .getPrivateDistributedCacheDir(task.getUser()));
       assertTrue("distributed cache dir " + distributedCacheDir
           + " doesn't exists!", distributedCacheDir.exists());
       checkFilePermissions(distributedCacheDir.getAbsolutePath(),
@@ -148,21 +139,6 @@
     }
   }
 
-  /**
-   * Test job localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of the job related files.
-   */
-  @Override
-  public void testJobLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testJobLocalization();
-  }
-
   @Override
   protected void checkJobLocalization()
       throws IOException {
@@ -208,21 +184,6 @@
     }
   }
 
-  /**
-   * Test task localization with {@link LinuxTaskController}. Also check the
-   * permissions and file ownership of task related files.
-   */
-  @Override
-  public void testTaskLocalization()
-      throws IOException {
-
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-
-    super.testTaskLocalization();
-  }
-
   @Override
   protected void checkTaskLocalization()
       throws IOException {
@@ -248,16 +209,4 @@
           .getUser(), taskTrackerSpecialGroup);
     }
   }
-
-  /**
-   * Test cleanup of task files with {@link LinuxTaskController}.
-   */
-  @Override
-  public void testTaskCleanup()
-      throws IOException {
-    if (!ClusterWithLinuxTaskController.shouldRun()) {
-      return;
-    }
-    super.testTaskCleanup();
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Tue Jan 26 14:02:53 2010
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
@@ -47,6 +48,7 @@
     conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
     conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000);
+    conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1");
     jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
     jobTracker.startExpireTrackersThread();
   }
@@ -91,4 +93,139 @@
     job.finishTask(tid[1]);
     
   }
+  
+  /**
+   * Test whether the tracker gets blacklisted after its lost.
+   */
+  public void testLostTrackerBeforeBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    TaskAttemptID[] tid = new TaskAttemptID[3];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+
+    job.finishTask(tid[0]);
+
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // lose the tracker
+    clock.advance(1100);
+    jobTracker.checkExpiredTrackers();
+    assertFalse("Tracker 0 not lost", 
+        jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                  .contains(trackers[0]));
+    
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 0, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // Tracker 1 establishes contact with JT 
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    
+    // Tracker1 should get assigned the lost map task
+    tid[1] =  job.findMapTask(trackers[1]);
+
+    assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
+    
+    assertEquals("Task ID of reassigned map task does not match",
+        tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
+    
+    // finish the map task
+    job.finishTask(tid[1]);
+
+    // finish the reduce task
+    tid[2] =  job.findReduceTask(trackers[1]);
+    job.finishTask(tid[2]);
+    
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the tracker is lost
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count .. since we lost one blacklisted tracker
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+  }
+
+  /**
+   * Test whether the tracker gets lost after its blacklisted.
+   */
+  public void testLostTrackerAfterBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    clock.advance(600);
+    TaskAttemptID[] tid = new TaskAttemptID[2];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+    // Fail the task
+    job.failTask(tid[0]);
+    
+    // Tracker 1 establishes contact with JT
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    
+    // Tracker 1 gets the map task
+    tid[1] = job.findMapTask(trackers[1]);
+    // Finish the task and also the job
+    job.finishTask(tid[1]);
+
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the trackers 1 got blacklisted
+    assertTrue("Tracker 0 not blacklisted", 
+               jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName()
+                 .equals(trackers[0]));
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                1, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+    // Advance clock. Tracker 0 should be lost
+    clock.advance(500);
+    jobTracker.checkExpiredTrackers();
+    
+    // check if the task tracker is lost
+    assertFalse("Tracker 0 not lost", 
+            jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                      .contains(trackers[0]));
+    
+    // check if the lost tracker has removed from the jobtracker
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Tue Jan 26 14:02:53 2010
@@ -20,6 +20,9 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -27,10 +30,19 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  *  Validates map phase progress.
@@ -92,9 +104,9 @@
    */
   class TestMapTask extends MapTask {
     public TestMapTask(String jobFile, TaskAttemptID taskId, 
-        int partition, String splitClass, BytesWritable split,
+        int partition, TaskSplitIndex splitIndex,
         int numSlotsRequired) {
-      super(jobFile, taskId, partition, splitClass, split, numSlotsRequired);
+      super(jobFile, taskId, partition, splitIndex, numSlotsRequired);
     }
     
     /**
@@ -141,16 +153,20 @@
     jobId = taskId.getJobID();
     
     JobContext jContext = new JobContextImpl(job, jobId);
-    Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+    InputFormat<?, ?> input =
+      ReflectionUtils.newInstance(jContext.getInputFormatClass(), job);
 
-    job.setUseNewMapper(true); // use new api
-    for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
+    List<InputSplit> splits = input.getSplits(jContext);
+    JobSplitWriter.createSplitFiles(new Path(TEST_ROOT_DIR), job, splits);
+    TaskSplitMetaInfo[] splitMetaInfo = 
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, job, new Path(TEST_ROOT_DIR));
+    job.setUseNewMapper(true); // use new api    
+    for (int i = 0; i < splitMetaInfo.length; i++) {// rawSplits.length is 1
       map = new TestMapTask(
           job.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system") +
           jobId + "job.xml",  
           taskId, i,
-          rawSplits[i].getClassName(),
-          rawSplits[i].getBytes(), 1);
+          splitMetaInfo[i].getSplitIndex(), 1);
 
       JobConf localConf = new JobConf(job);
       map.localizeConfiguration(localConf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Tue Jan 26 14:02:53 2010
@@ -34,7 +34,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -308,56 +307,6 @@
       
   }
 
-  private static class BadPartitioner
-      implements Partitioner<LongWritable,Text> {
-    boolean low;
-    public void configure(JobConf conf) {
-      low = conf.getBoolean("test.testmapred.badpartition", true);
-    }
-    public int getPartition(LongWritable k, Text v, int numPartitions) {
-      return low ? -1 : numPartitions;
-    }
-  }
-
-  @Test
-  public void testPartitioner() throws Exception {
-    JobConf conf = new JobConf(TestMapRed.class);
-    conf.setPartitionerClass(BadPartitioner.class);
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path testdir = new Path(
-        System.getProperty("test.build.data","/tmp")).makeQualified(fs);
-    Path inFile = new Path(testdir, "blah/blah");
-    DataOutputStream f = fs.create(inFile);
-    f.writeBytes("blah blah blah\n");
-    f.close();
-    FileInputFormat.setInputPaths(conf, inFile);
-    FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
-    conf.setMapperClass(IdentityMapper.class);
-    conf.setReducerClass(IdentityReducer.class);
-    conf.setOutputKeyClass(LongWritable.class);
-    conf.setOutputValueClass(Text.class);
-
-    // partition too low
-    conf.setBoolean("test.testmapred.badpartition", true);
-    boolean pass = true;
-    try {
-      JobClient.runJob(conf);
-    } catch (IOException e) {
-      pass = false;
-    }
-    assertFalse("should fail for partition < 0", pass);
-
-    // partition too high
-    conf.setBoolean("test.testmapred.badpartition", false);
-    pass = true;
-    try {
-      JobClient.runJob(conf);
-    } catch (IOException e) {
-      pass = false;
-    }
-    assertFalse("should fail for partition >= numPartitions", pass);
-  }
-
   public static class NullMapper
       implements Mapper<NullWritable,Text,NullWritable,Text> {
     public void map(NullWritable key, Text val,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Jan 26 14:02:53 2010
@@ -38,15 +38,13 @@
 public class TestMiniMRClasspath extends TestCase {
   
   
-  static String launchWordCount(URI fileSys,
+  static void configureWordCount(FileSystem fs,
                                 String jobTracker,
                                 JobConf conf,
                                 String input,
                                 int numMaps,
-                                int numReduces) throws IOException {
-    final Path inDir = new Path("/testing/wc/input");
-    final Path outDir = new Path("/testing/wc/output");
-    FileSystem fs = FileSystem.get(fileSys, conf);
+                                int numReduces,
+                                Path inDir, Path outDir) throws IOException {
     fs.delete(outDir, true);
     if (!fs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -56,7 +54,7 @@
       file.writeBytes(input);
       file.close();
     }
-    FileSystem.setDefaultUri(conf, fileSys);
+    FileSystem.setDefaultUri(conf, fs.getUri());
     conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
@@ -75,6 +73,16 @@
     conf.setNumReduceTasks(numReduces);
     //pass a job.jar already included in the hadoop build
     conf.setJar("build/test/mapred/testjar/testjob.jar");
+  }
+  
+  static String launchWordCount(URI fileSys, String jobTracker, JobConf conf,
+                                String input, int numMaps, int numReduces) 
+  throws IOException {
+    final Path inDir = new Path("/testing/wc/input");
+    final Path outDir = new Path("/testing/wc/output");
+    FileSystem fs = FileSystem.get(fileSys, conf);
+    configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir, 
+                       outDir);
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import junit.extensions.TestSetup;
@@ -28,7 +29,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -178,4 +182,56 @@
   public void testNoJvmReuse() throws Exception {
     runJvmReuseTest(mrCluster.createJobConf(), false);
   }
+
+  private static class BadPartitioner
+      implements Partitioner<LongWritable,Text> {
+    boolean low;
+    public void configure(JobConf conf) {
+      low = conf.getBoolean("test.testmapred.badpartition", true);
+    }
+    public int getPartition(LongWritable k, Text v, int numPartitions) {
+      return low ? -1 : numPartitions;
+    }
+  }
+
+  public void testPartitioner() throws Exception {
+    JobConf conf = mrCluster.createJobConf();
+    conf.setPartitionerClass(BadPartitioner.class);
+    conf.setNumReduceTasks(3);
+    FileSystem fs = FileSystem.get(conf);
+    Path testdir =
+      new Path("blah").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path inFile = new Path(testdir, "blah");
+    DataOutputStream f = fs.create(inFile);
+    f.writeBytes("blah blah blah\n");
+    f.close();
+    FileInputFormat.setInputPaths(conf, inFile);
+    FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMaxMapAttempts(1);
+
+    // partition too low
+    conf.setBoolean("test.testmapred.badpartition", true);
+    boolean pass = true;
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      pass = false;
+    }
+    assertFalse("should fail for partition < 0", pass);
+
+    // partition too high
+    conf.setBoolean("test.testmapred.badpartition", false);
+    pass = true;
+    try {
+      JobClient.runJob(conf);
+    } catch (IOException e) {
+      pass = false;
+    }
+    assertFalse("should fail for partition >= numPartitions", pass);
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Jan 26 14:02:53 2010
@@ -61,6 +61,10 @@
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
+
       TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
 
       // run the wordcount example with caching

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Jan 26 14:02:53 2010
@@ -36,12 +36,14 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -289,8 +291,11 @@
     long hdfsWrite = 
       counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
           Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    long rawSplitBytesRead = 
+      counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
-    assertEquals(input.length(), hdfsRead);
+    // add the correction factor of 234 as the input split is also streamed
+    assertEquals(input.length() + rawSplitBytesRead, hdfsRead);
 
     // Run a job with input and output going to localfs even though the 
     // default fs is hdfs.
@@ -324,6 +329,9 @@
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      // make cleanup inline sothat validation of existence of these directories
+      // can be done
+      mr.setInlineCleanupThreads();
 
       runPI(mr, mr.createJobConf());
       runWordCount(mr, mr.createJobConf());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Tue Jan 26 14:02:53 2010
@@ -23,9 +23,16 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.security.*;
 
 /**
@@ -43,7 +50,11 @@
   }
   
   static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
-    JobConf jobconf = mr.createJobConf();
+    return createJobConf(mr.createJobConf(), ugi);
+  }
+
+  static JobConf createJobConf(JobConf conf, UnixUserGroupInformation ugi) {
+    JobConf jobconf = new JobConf(conf);
     UnixUserGroupInformation.saveToConf(jobconf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
     return jobconf;
@@ -55,6 +66,50 @@
     fs.setPermission(p, new FsPermission((short)0777));
   }
 
+  // runs a sample job as a user (ugi)
+  RunningJob runJobAsUser(JobConf job, UserGroupInformation ugi) 
+  throws Exception {
+    ClientProtocol jobSubmitClient = 
+      TestSubmitJob.getJobSubmitClient(job, ugi);
+    org.apache.hadoop.mapreduce.JobID id = jobSubmitClient.getNewJobID();
+    
+    InputSplit[] splits = computeJobSplit(JobID.downgrade(id), job);
+    Path jobSubmitDir = new Path(id.toString());
+    FileSystem fs = jobSubmitDir.getFileSystem(job);
+    jobSubmitDir = jobSubmitDir.makeQualified(fs);
+    uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
+    
+    jobSubmitClient.submitJob(id, jobSubmitDir.toString(), null);
+    
+    JobClient jc = new JobClient(job);
+    return jc.getJob(JobID.downgrade(id));
+  }
+  
+  // a helper api for split computation
+  private InputSplit[] computeJobSplit(JobID id, JobConf conf) 
+  throws IOException {
+    InputSplit[] splits = 
+      conf.getInputFormat().getSplits(conf, conf.getNumMapTasks());
+    conf.setNumMapTasks(splits.length);
+    return splits;
+  }
+
+
+  // a helper api for split submission
+  private void uploadJobFiles(JobID id, InputSplit[] splits,
+                             Path jobSubmitDir, JobConf conf) 
+  throws IOException {
+    Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, splits);
+    FileSystem fs = confLocation.getFileSystem(conf);
+    FsPermission perm = new FsPermission((short)0700);
+    
+    // localize conf
+    DataOutputStream confOut = FileSystem.create(fs, confLocation, perm);
+    conf.writeXml(confOut);
+    confOut.close();
+  }
+  
   public void testDistinctUsers() throws Exception {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -71,15 +126,32 @@
           UnixUserGroupInformation.login().getUserName(), false); 
       mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
            1, null, null, MR_UGI);
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
 
-      JobConf pi = createJobConf(mr, PI_UGI);
-      TestMiniMRWithDFS.runPI(mr, pi);
-
-      JobConf wc = createJobConf(mr, WC_UGI);
-      TestMiniMRWithDFS.runWordCount(mr, wc);
+      JobConf job1 = mr.createJobConf();
+      String input = "The quick brown fox\nhas many silly\n" 
+                     + "red fox sox\n";
+      Path inDir = new Path("/testing/distinct/input");
+      Path outDir = new Path("/testing/distinct/output");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, 
+                                             input, 2, 1, inDir, outDir);
+      job1 = createJobConf(job1, PI_UGI);
+      runJobAsUser(job1, PI_UGI);
+
+      JobConf job2 = mr.createJobConf();
+      Path inDir2 = new Path("/testing/distinct/input2");
+      Path outDir2 = new Path("/testing/distinct/output2");
+      TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2, 
+                                             input, 2, 1, inDir2, outDir2);
+      job2 = createJobConf(job2, WC_UGI);
+      runJobAsUser(job2, WC_UGI);
     } finally {
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown();}
     }
   }
+  
+  public void testRestartWithDistinctUsers() {
+    
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Jan 26 14:02:53 2010
@@ -22,7 +22,9 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import junit.framework.TestCase;
@@ -31,6 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -61,6 +64,14 @@
                             int numExcluded, Configuration conf) 
   throws IOException {
     try {
+   // create fake mapping for the groups
+      Map<String, String[]> u2g_map = new HashMap<String, String[]> (1);
+      u2g_map.put("user1", new String[] {"user1" });
+      u2g_map.put("user2", new String[] {"user2" });
+      u2g_map.put("user3", new String[] {"abc" });
+      u2g_map.put("user4", new String[] {"supergroup" });
+      DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+      
       conf.setBoolean("dfs.replication.considerLoad", false);
       
       // prepare hosts info
@@ -168,7 +179,7 @@
     // refresh with super user
     success = false;
     UserGroupInformation ugi_super =
-      TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", true);
+      TestMiniMRWithDFSWithDistinctUsers.createUGI("user4", true);
     client = getClient(conf, ugi_super);
     try {
       client.refreshNodes();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Tue Jan 26 14:02:53 2010
@@ -250,6 +250,28 @@
   }
 
   @Test
+  public void testMissingConfigFile() throws Exception {
+    checkForConfigFile(); // deletes file
+
+    try {
+      new QueueManager(CONFIG);
+      fail("Should throw an exception for missing file when " +
+           "explicitly passed.");
+    } catch (RuntimeException re) {
+    }
+
+    // If we just want to pick up the queues from the class loader
+    // it should fall through to the default. The class loader is set to
+    // load CONFIG for the "mapred-queues.xml" resource, but it's missing
+    // so should fall through to mapred-queues-default.xml
+    QueueManager qm = new QueueManager();
+    List<JobQueueInfo> rootQueues =
+      qm.getRoot().getJobQueueInfo().getChildren();
+    assertEquals(1, rootQueues.size());
+    assertEquals("default", rootQueues.get(0).getQueueName());
+  }
+
+  @Test
   public void testEmptyProperties() throws Exception {
     checkForConfigFile();
     Document doc = createDocument();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Tue Jan 26 14:02:53 2010
@@ -31,8 +31,10 @@
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
 
@@ -93,38 +95,36 @@
 
     @Override
     public void initTasks() throws IOException {
-      Job.RawSplit[] splits = createSplits();
-      numMapTasks = splits.length;
-      createMapTasks(null, splits);
-      nonRunningMapCache = createCache(splits, maxLevel);
+      TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+      numMapTasks = taskSplitMetaInfo.length;
+      createMapTasks(null, taskSplitMetaInfo);
+      nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
       tasksInited.set(true);
       this.status.setRunState(JobStatus.RUNNING);
 
     }
   
-
-    protected Job.RawSplit[] createSplits() throws IOException {
-      Job.RawSplit[] splits = new Job.RawSplit[numMaps];
+    @Override
+    protected TaskSplitMetaInfo [] createSplits(
+        org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMaps];
       // Hand code for now. 
       // M0,2,3 reside in Host1
       // M1 resides in Host3
       // M4 resides in Host4
       String[] splitHosts0 = new String[] { allHosts[0] };
 
-      for (int i = 0; i < numMaps; i++) {
-        splits[i] = new Job.RawSplit();
-        splits[i].setDataLength(0);
-      }
-
-      splits[0].setLocations(splitHosts0);
-      splits[2].setLocations(splitHosts0);
-      splits[3].setLocations(splitHosts0);
-      
       String[] splitHosts1 = new String[] { allHosts[2] };
-      splits[1].setLocations(splitHosts1);
-
       String[] splitHosts2 = new String[] { allHosts[3] };
-      splits[4].setLocations(splitHosts2);
+      for (int i = 0; i < numMaps; i++) {
+    	if (i == 0 || i == 2 || i == 3) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
+        } else if (i == 1) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts1, 0, 0);
+        } else if (i == 4) {
+          splits[i] = new TaskSplitMetaInfo(splitHosts2, 0, 0);
+        }
+      }
 
       return splits;
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 
 import junit.framework.TestCase;
@@ -48,10 +49,11 @@
   /**
    * Tests the {@link JobTracker} against the exceptions thrown in 
    * {@link JobTracker.RecoveryManager}. It does the following :
-   *  - submits 2 jobs
+   *  - submits 3 jobs
    *  - kills the jobtracker
    *  - Garble job.xml for one job causing it to fail in constructor 
    *    and job.split for another causing it to fail in init.
+   *  - delete the job temp/submit dir
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
@@ -79,7 +81,7 @@
     
     // wait for 50%
     UtilsForTests.waitForJobHalfDone(rJob1);
-    
+        
     JobConf job2 = mr.createJobConf();
     
     UtilsForTests.configureWaitingJobConf(job2, 
@@ -101,26 +103,15 @@
     // delete the job.xml of job #1 causing the job to fail in submit Job
     //while recovery itself
     Path jobFile = 
-      new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
-    LOG.info("Deleting job.xml file : " + jobFile.toString());
+      new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
+    LOG.info("Deleting job token file : " + jobFile.toString());
     fs.delete(jobFile, false); // delete the job.xml file
     
-    // create the job.xml file with 0 bytes
+    // create the job token file with 1 byte
     FSDataOutputStream out = fs.create(jobFile);
     out.write(1);
     out.close();
-
-    // delete the job.split of job #2 causing the job to fail in initTasks
-    Path jobSplitFile = 
-      new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
-    LOG.info("Deleting job.split file : " + jobSplitFile.toString());
-    fs.delete(jobSplitFile, false); // delete the job.split file
     
-    // create the job.split file with 0 bytes
-    out = fs.create(jobSplitFile);
-    out.write(1);
-    out.close();
-
     // make sure that the jobtracker is in recovery mode
     mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, true);
     // start the jobtracker
@@ -290,7 +281,7 @@
     conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
     conf.set(JTConfig.JT_HTTP_ADDRESS, "127.0.0.1:0");
 
-    JobTracker jobtracker = new JobTracker(conf);
+    JobTracker jobtracker = JobTracker.startTracker(conf);
 
     // now check if the update restart count works fine or not
     boolean failed = false;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Tue Jan 26 14:02:53 2010
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.mapred;
 
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import junit.framework.TestCase;
 
 public class TestResourceEstimation extends TestCase {
   
@@ -47,8 +46,8 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      Job.RawSplit split = new Job.RawSplit();
-      split.setDataLength(0);
+      JobSplit.TaskSplitMetaInfo split = 
+        new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
@@ -83,8 +82,9 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      Job.RawSplit split = new Job.RawSplit();
-      split.setDataLength(singleMapInputSize);
+      JobSplit.TaskSplitMetaInfo split = 
+        new JobSplit.TaskSplitMetaInfo(new String[0], 0, 
+                                           singleMapInputSize);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
@@ -95,8 +95,8 @@
     //add one more map task with input size as 0
     TaskStatus ts = new MapTaskStatus();
     ts.setOutputSize(singleMapOutputSize);
-    Job.RawSplit split = new Job.RawSplit();
-    split.setDataLength(0);
+    JobSplit.TaskSplitMetaInfo split = 
+      new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
     TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);
     

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Tue Jan 26 14:02:53 2010
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
@@ -155,7 +156,7 @@
     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
     JobInProgress jip = jt.getJob(job.getID());
     // get the running setup task id
-    TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
+    TaskAttemptID setupID = getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP));
     if (commandLineKill) {
       killTaskFromCommandLine(job, setupID, jt);
     } else {
@@ -172,7 +173,8 @@
       } catch (InterruptedException ie) {}
     }
     // get the running cleanup task id
-    TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
+    TaskAttemptID cleanupID = 
+      getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP));
     if (commandLineKill) {
       killTaskFromCommandLine(job, cleanupID, jt);
     } else {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Tue Jan 26 14:02:53 2010
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 
@@ -60,7 +61,7 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      Job.RawSplit emptySplit = new Job.RawSplit();
+      JobSplit.TaskSplitMetaInfo emptySplit = new JobSplit.TaskSplitMetaInfo();
       setup = new TaskInProgress[2];
       setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
           jobtracker, getJobConf(), this, numMapTasks + 1, 1);
@@ -109,12 +110,13 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      Job.RawSplit emptySplit = new Job.RawSplit();
+
       final int numSlotsPerTask = 2;
       maps = new TaskInProgress[1];
       reduces = new TaskInProgress[1];
       
-      maps[0] = new FakeTaskInProgress(getJobID(), "test",  emptySplit,
+      maps[0] = new FakeTaskInProgress(getJobID(), "test",  
+          JobSplit.EMPTY_TASK_SPLIT,
           jobtracker, getJobConf(), this, 0, numSlotsPerTask);
       TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
       

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Tue Jan 26 14:02:53 2010
@@ -92,6 +92,8 @@
 
         TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
         TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+        TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, 
+            conf.get(JTConfig.JT_STAGING_AREA_ROOT));
 
         UnixUserGroupInformation MR_UGI = 
           TestMiniMRWithDFSWithDistinctUsers.createUGI(
@@ -105,6 +107,10 @@
         mrCluster =   new MiniMRCluster(0, 0,
             numTT, dfs.getFileSystem().getUri().toString(), 
             1, null, null, MR_UGI, new JobConf());
+        // make cleanup inline sothat validation of existence of these directories
+        // can be done
+        mrCluster.setInlineCleanupThreads();
+
         mrCluster.getJobTrackerRunner().getJobTracker()
         .addJobInProgressListener(myListener);
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestShuffleJobToken.java Tue Jan 26 14:02:53 2010
@@ -27,9 +27,14 @@
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 
+import javax.crypto.SecretKey;
+
 import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +45,7 @@
   private static URL baseUrl;
   private static File dir;
   private static final String JOB_ID = "job_20091117075357176_0001";
+  private static final String BAD_JOB_ID = "job_20091117075357176_0002";
   
   // create fake url
   private URL getMapOutputURL(String host)  throws MalformedURLException {
@@ -86,25 +92,26 @@
     URL url = getMapOutputURL(baseUrl.toString());
     String enc_str = SecureShuffleUtils.buildMsgFrom(url);
     URLConnection connectionGood = url.openConnection();
-
-    // create key 
-    byte [] key= SecureShuffleUtils.getNewEncodedKey();
     
-    // create fake TaskTracker - needed for keys storage
-    JobTokens jt = new JobTokens();
-    jt.setShuffleJobToken(key);
     TaskTracker tt  = new TaskTracker();
+    JobTokenSecretManager jtSecretManager = new JobTokenSecretManager();
+    // create fake TaskTracker - needed for keys storage
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(JOB_ID));
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>(identifier,
+        jtSecretManager);
+    SecretKey tokenSecret = JobTokenSecretManager.createSecretKey(jt.getPassword());
     addJobToken(tt, JOB_ID, jt); // fake id
     server.setAttribute("task.tracker", tt);
 
     // encode the url
-    SecureShuffleUtils mac = new SecureShuffleUtils(key);
-    String urlHashGood = mac.generateHash(enc_str.getBytes()); // valid hash
+    String urlHashGood = SecureShuffleUtils.generateHash(enc_str.getBytes(), tokenSecret); // valid hash
     
     // another the key
-    byte [] badKey= SecureShuffleUtils.getNewEncodedKey();
-    mac = new SecureShuffleUtils(badKey);
-    String urlHashBad = mac.generateHash(enc_str.getBytes()); // invalid hash 
+    JobTokenIdentifier badIdentifier = new JobTokenIdentifier(new Text(BAD_JOB_ID));
+    Token<JobTokenIdentifier> badToken = new Token<JobTokenIdentifier>(badIdentifier,
+        jtSecretManager);
+    SecretKey badSecret = JobTokenSecretManager.createSecretKey(badToken.getPassword());
+    String urlHashBad = SecureShuffleUtils.generateHash(enc_str.getBytes(), badSecret); // invalid hash 
     
     // put url hash into http header
     connectionGood.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, urlHashGood);
@@ -135,13 +142,13 @@
     } 
   }
   /*Note that this method is there for a unit testcase (TestShuffleJobToken)*/
-  void addJobToken(TaskTracker tt, String jobIdStr, JobTokens jt) {
+  void addJobToken(TaskTracker tt, String jobIdStr, Token<JobTokenIdentifier> token) {
     JobID jobId = JobID.forName(jobIdStr);
     TaskTracker.RunningJob rJob = new TaskTracker.RunningJob(jobId);
-    rJob.jobTokens = jt;
     synchronized (tt.runningJobs) {
       tt.runningJobs.put(jobId, rJob);
     }
+    tt.getJobTokenSecretManager().addTokenForJob(jobIdStr, token);
   }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Tue Jan 26 14:02:53 2010
@@ -23,6 +23,8 @@
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
@@ -40,6 +42,8 @@
     }
   };
   static SpecFakeClock clock;
+  static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
   
   static String trackers[] = new String[] {"tracker_tracker1:1000", 
       "tracker_tracker2:1000", "tracker_tracker3:1000",
@@ -65,6 +69,68 @@
     return setup;
   }
 
+  public void testRunningTaskCountWithSpeculation() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[8];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(3);
+    conf.setNumReduceTasks(3);
+    conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+
+    //Check for runningMap counts first
+    //schedule maps
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[1]);
+    taskAttemptID[2] = job.findMapTask(trackers[2]);
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[0]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[1]);
+    clock.advanceBySpeculativeLag();
+
+    //we should get a speculative task now
+    taskAttemptID[3] = job.findMapTask(trackers[3]);
+    int oldRunningMap = job.runningMaps();
+    LOG.info("No of running maps before fail was " + oldRunningMap);
+    job.failTask(taskAttemptID[2]);
+    assertEquals(
+      "Running maps count should be updated from " + oldRunningMap + " to " +
+        (oldRunningMap - 1), job.runningMaps(), oldRunningMap - 1);
+    LOG.info(" Job running maps after fail " + job.runningMaps());
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[3]);
+
+    //check for runningReduce count.
+    taskAttemptID[4] = job.findReduceTask(trackers[0]);
+    taskAttemptID[5] = job.findReduceTask(trackers[1]);
+    taskAttemptID[6] = job.findReduceTask(trackers[2]);
+
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[4]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[5]);
+
+    clock.advanceBySpeculativeLag();
+    taskAttemptID[7] = job.findReduceTask(trackers[4]);
+
+    int oldRunningReduces = job.runningReduces();
+    job.failTask(taskAttemptID[6]);
+    LOG.info(
+      " No of running Reduces before fail " + oldRunningReduces);
+    LOG.info(
+      " No of runing reduces after fail " + job.runningReduces());
+    assertEquals(
+      "Running reduces count should be updated from " + oldRunningReduces +
+        " to " + (oldRunningReduces - 1), job.runningReduces(),
+      oldRunningReduces - 1);
+    
+    job.finishTask(taskAttemptID[7]);
+  }
+
   public void testIsSlowTracker() throws IOException {
     TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
     JobConf conf = new JobConf();
@@ -171,7 +237,7 @@
     taskAttemptID[5] = job.findMapTask(trackers[4]);
     assertEquals(taskAttemptID[5].getTaskID().getId(),4);
   }
-  
+
   /*
    * Tests the fact that we only launch a limited number of speculative tasks,
    * even though we have a lot of tasks in RUNNING state
@@ -219,7 +285,7 @@
       taskAttemptID[i] = job.findMapTask(trackers[1]);
       clock.advance(2000);
       if (taskAttemptID[i] != null) {
-        //add some good progress constantly for the different 
+        //add some good progress constantly for the different
         //task-attempts so that
         //the tasktracker doesn't get into the slow trackers category
         job.progressMade(taskAttemptID[i], 0.99f);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Tue Jan 26 14:02:53 2010
@@ -17,25 +17,81 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
-
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.util.ToolRunner;
 
 import junit.framework.TestCase;
 
+/**
+ * Test job submission. This test checks if 
+ *   - basic   : job submission via jobclient
+ *   - cleanup : job client crashes while submitting
+ *   - invalid job config
+ *     - invalid memory config
+ *   
+ */
 public class TestSubmitJob extends TestCase {
-  private MiniMRCluster miniMRCluster;
-
-  @Override
-  protected void tearDown()
-      throws Exception {
-    if (miniMRCluster != null) {
-      miniMRCluster.shutdown();
-    }
+  static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
+  
+  private MiniMRCluster mrCluster;
+
+  private MiniDFSCluster dfsCluster;
+  private JobTracker jt;
+  private FileSystem fs;
+  private static Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-submission-testing");
+  private static int numSlaves = 1;
+
+  private void startCluster() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    JobConf jConf = new JobConf(conf);
+    jConf.setLong("mapred.job.submission.expiry.interval", 6 * 1000);
+    mrCluster = new MiniMRCluster(0, 0, numSlaves, 
+        dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, 
+        jConf);
+    jt = mrCluster.getJobTrackerRunner().getJobTracker();
+    fs = FileSystem.get(mrCluster.createJobConf());
+  }
+  
+  private void stopCluster() throws Exception {
+    mrCluster.shutdown();
+    mrCluster = null;
+    dfsCluster.shutdown();
+    dfsCluster = null;
+    jt = null;
+    fs = null;
   }
 
   /**
@@ -56,9 +112,9 @@
     jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB,
         4 * 1024L);
 
-    miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+    mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
 
-    JobConf clusterConf = miniMRCluster.createJobConf();
+    JobConf clusterConf = mrCluster.createJobConf();
 
     // No map-memory configuration
     JobConf jobConf = new JobConf(clusterConf);
@@ -85,6 +141,9 @@
     jobConf.setMemoryForReduceTask(5 * 1024L);
     runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
         "Exceeds the cluster's max-memory-limit.");
+    
+    mrCluster.shutdown();
+    mrCluster = null;
   }
 
   private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
@@ -110,4 +169,127 @@
         + " - doesn't contain expected message - " + overallExpectedMsg, msg
         .contains(overallExpectedMsg));
   }
-}
+  
+  static ClientProtocol getJobSubmitClient(JobConf conf, 
+                                           UserGroupInformation ugi) 
+  throws IOException {
+    return (ClientProtocol) RPC.getProxy(ClientProtocol.class, 
+        ClientProtocol.versionID, JobTracker.getAddress(conf), ugi, 
+        conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
+  }
+
+  static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient(
+      Configuration conf, UserGroupInformation ugi) 
+  throws IOException {
+    return (org.apache.hadoop.hdfs.protocol.ClientProtocol) 
+      RPC.getProxy(org.apache.hadoop.hdfs.protocol.ClientProtocol.class, 
+        org.apache.hadoop.hdfs.protocol.ClientProtocol.versionID, 
+        NameNode.getAddress(conf), ugi, 
+        conf, 
+        NetUtils.getSocketFactory(conf, 
+            org.apache.hadoop.hdfs.protocol.ClientProtocol.class));
+  }
+  
+  /**
+   * Submit a job and check if the files are accessible to other users.
+   */
+  public void testSecureJobExecution() throws Exception {
+    LOG.info("Testing secure job submission/execution");
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      UnixUserGroupInformation.saveToConf(conf,
+      UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+      TestMiniMRWithDFSWithDistinctUsers.DFS_UGI);
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fs = dfs.getFileSystem();
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, 
+          conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+      UnixUserGroupInformation MR_UGI = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI(
+           UnixUserGroupInformation.login().getUserName(), false); 
+      mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(),
+                             1, null, null, MR_UGI);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+
+      // cleanup
+      dfs.getFileSystem().delete(TEST_DIR, true);
+
+      final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
+      final Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
+      
+      // create a ugi for user 1
+      UnixUserGroupInformation user1 = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+      Path inDir = new Path("/user/input");
+      Path outDir = new Path("/user/output");
+      JobConf job = 
+      TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user1);
+
+      UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0, 
+        "test-submit-job", mapSignalFile.toString(), 
+        reduceSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(true), 
+      mapSignalFile.toString());
+      job.set(UtilsForTests.getTaskSignalParameter(false), 
+      reduceSignalFile.toString());
+      LOG.info("Submit job as the actual user (" + user1.getUserName() + ")");
+      JobClient jClient = new JobClient(job);
+      RunningJob rJob = jClient.submitJob(job);
+      JobID id = rJob.getID();
+      LOG.info("Running job " + id);
+
+      // create user2
+      UnixUserGroupInformation user2 = 
+        TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
+      JobConf conf_other = 
+      TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user2);
+      org.apache.hadoop.hdfs.protocol.ClientProtocol client = 
+        getDFSClient(conf_other, user2);
+
+      // try accessing mapred.system.dir/jobid/*
+      boolean failed = false;
+      try {
+        Path path = new Path(new URI(jt.getSystemDir()).getPath());
+        LOG.info("Try listing the mapred-system-dir as the user (" 
+                 + user2.getUserName() + ")");
+        client.getListing(path.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("JobTracker system dir is accessible to others", failed);
+      // try accessing ~/.staging/jobid/*
+      failed = false;
+      JobInProgress jip = jt.getJob(id);
+      Path jobSubmitDirpath = 
+        new Path(jip.getJobConf().get("mapreduce.job.dir"));
+      try {
+        LOG.info("Try accessing the job folder for job " + id + " as the user (" 
+                 + user2.getUserName() + ")");
+        client.getListing(jobSubmitDirpath.toString());
+      } catch (IOException ioe) {
+        failed = true;
+      }
+      assertTrue("User's staging folder is accessible to others", failed);
+      UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(), 
+      reduceSignalFile.toString());
+      // wait for job to be done
+      UtilsForTests.waitTillDone(jClient);
+
+      // check if the staging area is cleaned up
+      LOG.info("Check if job submit dir is cleanup or not");
+      assertFalse(fs.exists(jobSubmitDirpath));
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Tue Jan 26 14:02:53 2010
@@ -179,7 +179,7 @@
     responseId++;
   }
 
-  public void AtestTrackerBlacklistingForJobFailures() throws Exception {
+  public void testTrackerBlacklistingForJobFailures() throws Exception {
     runBlackListingJob(jobTracker, trackers);
     assertEquals("Tracker 1 not blacklisted", jobTracker
         .getBlacklistedTrackerCount(), 1);



Mime
View raw message