hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r792890 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
Date Fri, 10 Jul 2009 11:46:54 GMT
Author: sharad
Date: Fri Jul 10 11:46:54 2009
New Revision: 792890

URL: http://svn.apache.org/viewvc?rev=792890&view=rev
Log:
MAPREDUCE-153. Fix timeout in TestJobInProgressListener. Contributed by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792890&r1=792889&r2=792890&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 10 11:46:54 2009
@@ -182,4 +182,7 @@
     MAPREDUCE-677. Fix timeout in TestNodeRefresh. (Amar Kamat via 
     sharad)
 
+    MAPREDUCE-153. Fix timeout in TestJobInProgressListener. (Amar 
+    Kamat via sharad)
+
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=792890&r1=792889&r2=792890&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jul 10 11:46:54
2009
@@ -2009,6 +2009,10 @@
     return FileSystem.getLocal(conf);
   }
 
+  TaskScheduler getScheduler() {
+    return taskScheduler;
+  }
+
   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration
conf) {
     return conf.getClass("mapred.jobtracker.instrumentation",
         JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=792890&r1=792889&r2=792890&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java
Fri Jul 10 11:46:54 2009
@@ -23,14 +23,20 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import junit.extensions.TestSetup;
+import junit.framework.Test;
 import junit.framework.TestCase;
+import junit.framework.TestSuite;
 
 /**
  * Test whether the JobInProgressListeners are informed as expected.
@@ -38,18 +44,42 @@
 public class TestJobInProgressListener extends TestCase {
   private static final Log LOG = 
     LogFactory.getLog(TestJobInProgressListener.class);
-  private final Path testDir = new Path("test-jip-listener-update");
-  
   private static String TEST_ROOT_DIR = new File(System.getProperty(
           "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
-
-  private JobConf configureJob(JobConf conf, int m, int r, 
-                               Path inDir, Path outputDir,
-                               String mapSignalFile, String redSignalFile) 
-  throws IOException {
-    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,  m, r, 
-        "job-listener-test", mapSignalFile, redSignalFile);
-    return conf; 
+  private final Path testDir = 
+    new Path(TEST_ROOT_DIR, "test-jip-listener-update");
+  private static MiniMRCluster mr;
+  private static JobTracker jobtracker;
+  private static JobConf conf;
+  private static MyScheduler myScheduler;
+
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestJobInProgressListener.class)) {
+      @Override
+      protected void setUp() throws Exception {
+        conf = new JobConf();   
+        conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+                      TaskScheduler.class);
+        mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+        jobtracker = mr.getJobTrackerRunner().getJobTracker();
+        myScheduler = (MyScheduler)jobtracker.getScheduler();
+        conf = mr.createJobConf();
+      }
+      
+      @Override
+      protected void tearDown() throws Exception {
+        conf = null;
+        try {
+          mr.shutdown();
+        } catch (Exception e) {
+          LOG.info("Error in shutting down the MR cluster", e);
+        }
+        jobtracker = null;
+        myScheduler.terminate();
+      }
+    };
+    return setup;
   }
   
   /**
@@ -64,56 +94,26 @@
    */
   public void testJobQueueChanges() throws IOException {
     LOG.info("Testing job queue changes");
-    JobConf conf = new JobConf();
-    MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null, null);
-    dfs.waitActive();
-    FileSystem fileSys = dfs.getFileSystem();
-    
-    dfs.startDataNodes(conf, 1, true, null, null, null, null);
-    dfs.waitActive();
-    
-    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
-                      + (dfs.getFileSystem()).getUri().getPort();
-    MiniMRCluster mr = new MiniMRCluster(1, namenode, 1);
-    JobClient jobClient = new JobClient(mr.createJobConf());
-    
-    // clean up
-    fileSys.delete(testDir, true);
     
-    if (!fileSys.mkdirs(testDir)) {
-      throw new IOException("Mkdirs failed to create " + testDir.toString());
-    }
-
-    // Write the input file
-    Path inDir = new Path(testDir, "input");
-    Path shareDir = new Path(testDir, "share");
-    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
-    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-    UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), 
-                            (short)1);
+    // stop the job initializer
+    myScheduler.stopInitializer();
     
     JobQueueJobInProgressListener myListener = 
       new JobQueueJobInProgressListener();
     
     // add the listener
-    mr.getJobTrackerRunner().getJobTracker()
-      .addJobInProgressListener(myListener);
+    jobtracker.addJobInProgressListener(myListener);
     
-    // big blocking job
-    Path outputDir = new Path(testDir, "output");
-    Path newOutputDir = outputDir.suffix("0");
-    JobConf job1 = configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir,
-                                mapSignalFile, redSignalFile);
-    
-    // short blocked job
-    newOutputDir = outputDir.suffix("1");
-    JobConf job2 = configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir,
-                                mapSignalFile, redSignalFile);
-    
-    RunningJob rJob1 = jobClient.submitJob(job1);
+    Path inDir = new Path(testDir, "input");
+    Path outputDir1 = new Path(testDir, "output1");
+    Path outputDir2 = new Path(testDir, "output2");
+        
+    RunningJob rJob1 = 
+      UtilsForTests.runJob(conf, inDir, outputDir1, 1, 0);
     LOG.info("Running job " + rJob1.getID().toString());
     
-    RunningJob rJob2 = jobClient.submitJob(job2);
+    RunningJob rJob2 = 
+      UtilsForTests.runJob(conf, inDir, outputDir2, 1, 0);
     LOG.info("Running job " + rJob2.getID().toString());
     
     // I. Check job-priority change
@@ -151,10 +151,8 @@
     rJob2.setJobPriority("NORMAL");
     
     // create the change event
-    JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker()
-                          .getJob(rJob2.getID());
-    JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker()
-                           .getJob(rJob1.getID());
+    JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
+    JobInProgress jip1 = jobtracker.getJob(rJob1.getID());
     
     JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
     
@@ -187,35 +185,58 @@
     
     assertEquals("Start time change has garbled the queue", 
                  2, queue.length);
+  }
+
+  /**
+   * Check the queue status upon
+   *   - failed job
+   *   - killed job
+   *   - successful job
+   */
+  public void testJobCompletion() throws Exception {
+    MyListener mainListener = new MyListener();
+    jobtracker.addJobInProgressListener(mainListener);
     
-    // signal the maps to complete
-    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    // stop the job initializer
+    myScheduler.stopInitializer();
     
-    // check if job completion leaves the queue sane
-    while (rJob2.getJobState() != JobStatus.SUCCEEDED) {
-      UtilsForTests.waitFor(10);
-    }
+    // check queued jobs
+    testQueuedJobKill(conf, mainListener);
     
-    while (rJob1.getJobState() != JobStatus.SUCCEEDED) {
-      UtilsForTests.waitFor(10);
-    }
+    myScheduler.startInitializer();
+    
+    // check the queue state for job states
+    testFailedJob(conf, mainListener);
     
-    assertTrue("Job completion garbles the queue", 
-               myListener.getJobQueue().size() == 0);
+    testKilledJob(conf, mainListener);
+    
+    testSuccessfulJob(conf, mainListener);
   }
   
   // A listener that inits the tasks one at a time and also listens to the 
   // events
   public static class MyListener extends JobInProgressListener {
     private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
-    private List<JobInProgress> jobs = new ArrayList<JobInProgress>(); 
+    private List<JobInProgress> rjobs = new ArrayList<JobInProgress>();
+    // list of job added to the wait queue
+    private List<JobID> wjobsAdded = new ArrayList<JobID>();
+    // list of job added to the running queue
+    private List<JobID> rjobsAdded = new ArrayList<JobID>();
     
     public boolean contains (JobID id) {
       return contains(id, true) || contains(id, false);
     }
     
     public boolean contains (JobID id, boolean waiting) {
-      List<JobInProgress> queue = waiting ? wjobs : jobs;
+      if (!wjobsAdded.contains(id)) {
+        throw new RuntimeException("Job " + id + " not seen in waiting queue");
+      }
+      if (!waiting) {
+        if (!rjobsAdded.contains(id)) {
+          throw new RuntimeException("Job " + id + " not seen in run queue");
+        }
+      }
+      List<JobInProgress> queue = waiting ? wjobs : rjobs;
       for (JobInProgress job : queue) {
         if (job.getJobID().equals(id)) {
           return true;
@@ -227,10 +248,13 @@
     public void jobAdded(JobInProgress job) {
       LOG.info("Job " + job.getJobID().toString() + " added");
       wjobs.add(job);
+      wjobsAdded.add(job.getJobID());
     }
     
     public void jobRemoved(JobInProgress job) {
       LOG.info("Job " + job.getJobID().toString() + " removed");
+      wjobs.remove(job);
+      rjobs.remove(job);
     }
     
     public void jobUpdated(JobChangeEvent event) {
@@ -248,33 +272,31 @@
             if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
               wjobs.remove(jip);
             } else {
-              jobs.remove(jip);
+              rjobs.remove(jip);
             }
           } else {
             // PREP->RUNNING
             LOG.info("Job " +  jobId + " deleted from the waiting queue");
             wjobs.remove(jip);
-            jobs.add(jip);
+            rjobs.add(jip);
+            rjobsAdded.add(jip.getJobID());
           }
         }
       }
     }
   }
   
-  public void testJobFailure() throws Exception {
-    LOG.info("Testing job-success");
-    
-    MyListener myListener = new MyListener();
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
-    
-    JobConf job = mr.createJobConf();
+  private void testFailedJob(JobConf job, MyListener myListener) 
+  throws IOException {
+    LOG.info("Testing job-fail");
     
-    mr.getJobTrackerRunner().getJobTracker()
-      .addJobInProgressListener(myListener);
-
     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
 
+    job.setNumMapTasks(1);
+    job.setNumReduceTasks(0);
+    job.setMaxMapAttempts(1);
+    
     // submit a job that fails 
     RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
     JobID id = rJob.getID();
@@ -282,23 +304,21 @@
     // check if the job failure was notified
     assertFalse("Missing event notification on failing a running job", 
                 myListener.contains(id));
-    
+
+    // check if failed
+    assertEquals("Job failed!", JobStatus.FAILED, rJob.getJobState());
   }
-  
-  public void testJobKill() throws Exception {
+
+  private void testKilledJob(JobConf job, MyListener myListener) 
+  throws IOException {
     LOG.info("Testing job-kill");
     
-    MyListener myListener = new MyListener();
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
-    
-    JobConf job = mr.createJobConf();
-    
-    mr.getJobTrackerRunner().getJobTracker()
-      .addJobInProgressListener(myListener);
-    
     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
 
+    job.setNumMapTasks(1);
+    job.setNumReduceTasks(0);
+    
     // submit and kill the job   
     RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
     JobID id = rJob.getID();
@@ -306,44 +326,75 @@
     // check if the job failure was notified
     assertFalse("Missing event notification on killing a running job", 
                 myListener.contains(id));
-    
+
+    // check if killed
+    assertEquals("Job failed!", JobStatus.KILLED, rJob.getJobState());
   }
-  
-  public void testJobSuccess() throws Exception {
+
+  private void testSuccessfulJob(JobConf job, MyListener myListener) 
+  throws Exception {
     LOG.info("Testing job-success");
-    MyListener myListener = new MyListener();
-    
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
-    
-    JobConf job = mr.createJobConf();
-    
-    mr.getJobTrackerRunner().getJobTracker()
-      .addJobInProgressListener(myListener);
     
     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
 
+    job.setNumMapTasks(1);
+    job.setNumReduceTasks(0);
+    
     // submit the job   
-    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
+    RunningJob rJob = UtilsForTests.runJobSucceed(job, inDir, outDir);
     
-    // wait for the job to be running
-    while (rJob.getJobState() != JobStatus.RUNNING) {
-      UtilsForTests.waitFor(10);
-    }
+    // wait for the job to be successful
+    rJob.waitForCompletion();
     
-    LOG.info("Job " +  rJob.getID().toString() + " started running");
+    // check if the job success was notified
+    assertFalse("Missing event notification for a successful job", 
+                myListener.contains(rJob.getID()));
+
+    // check if successful
+    assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState());
     
-    // check if the listener was updated about this change
-    assertFalse("Missing event notification for a running job", 
-                myListener.contains(rJob.getID(), true));
+    // test if 0-task jobs with setup-cleanup works fine
+    LOG.info("Testing job with no task job with setup and cleanup");
     
-    while (rJob.getJobState() != JobStatus.SUCCEEDED) {
-      UtilsForTests.waitFor(10);
-    }
+    job.setNumMapTasks(0);
+    job.setNumReduceTasks(0);
+    
+    outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks");
+    
+    // submit the job   
+    rJob = UtilsForTests.runJobSucceed(job, inDir, outDir);
+    
+    // wait for the job to be successful
+    rJob.waitForCompletion();
     
     // check if the job success was notified
-    assertFalse("Missing event notification for a successful job", 
-                myListener.contains(rJob.getID(), false));
+    assertFalse("Missing event notification for a successful job with no tasks", 
+                myListener.contains(rJob.getID(), true));
+    
+    // check if successful
+    assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState());
+   
+    // test if jobs with no tasks (0 maps, 0 red) update the listener properly
+    LOG.info("Testing job with no-set-cleanup no task");
+    
+    outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks-no-set");
+    
+    Job j = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 0, 0);
+    j.setJobSetupCleanupNeeded(false);
+    j.setOutputFormatClass(TestNoJobSetupCleanup.MyOutputFormat.class);
+    j.submit();
+    j.waitForCompletion(true);
+    
+    JobID id = (org.apache.hadoop.mapred.JobID)j.getID();
+    
+    // check if the job is in the waiting queue
+    assertFalse("Missing event notification on no-set-cleanup no task job", 
+                myListener.contains(id, true));
+    
+    // check if the job is successful
+    assertEquals("Job status doesnt reflect success", 
+                 JobStatus.SUCCEEDED, rJob.getJobState());
   }
   
   /**
@@ -355,33 +406,40 @@
     @Override
     public synchronized void start() throws IOException {
       super.start();
+    }
+
+    void stopInitializer() throws IOException {
       // Remove the eager task initializer
       taskTrackerManager.removeJobInProgressListener(
           eagerTaskInitializationListener);
       // terminate it
       eagerTaskInitializationListener.terminate();
     }
+    
+    void startInitializer() throws IOException {
+      eagerTaskInitializationListener = 
+        new EagerTaskInitializationListener(getConf());
+      eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
+      // start it
+      eagerTaskInitializationListener.start();
+      // add the eager task initializer
+      taskTrackerManager.addJobInProgressListener(
+          eagerTaskInitializationListener);
+    }
   }
   
-  public void testQueuedJobKill() throws Exception {
+  private void testQueuedJobKill(JobConf conf, MyListener myListener) 
+  throws IOException {
     LOG.info("Testing queued-job-kill");
     
-    MyListener myListener = new MyListener();
-    
-    JobConf job = new JobConf();
-    job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
-                 TaskScheduler.class);
-    MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
-    
-    job = mr.createJobConf();
-    
-    mr.getJobTrackerRunner().getJobTracker()
-      .addJobInProgressListener(myListener);
-    
-    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
-    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
+    Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerqueuedjob/input");
+    Path outDir = new Path(TEST_ROOT_DIR + "/jiplistener1ueuedjob/output");
 
-    RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    RunningJob rJob = UtilsForTests.runJob(conf, inDir, outDir);
     JobID id = rJob.getID();
     LOG.info("Job : " + id.toString() + " submitted");
     

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=792890&r1=792889&r2=792890&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
Fri Jul 10 11:46:54 2009
@@ -85,7 +85,7 @@
     }
   }
   
-  static class MyOutputFormat extends TextOutputFormat {
+  public static class MyOutputFormat extends TextOutputFormat {
     public void checkOutputSpecs(JobContext job) 
         throws FileAlreadyExistsException, IOException{
       super.checkOutputSpecs(job);



Mime
View raw message