hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1422968 - in /hadoop/common/branches/branch-1.1/src: mapred/org/apache/hadoop/mapred/TaskTracker.java test/org/apache/hadoop/mapred/TestRecoveryManager.java
Date Mon, 17 Dec 2012 15:03:56 GMT
Author: tomwhite
Date: Mon Dec 17 15:03:55 2012
New Revision: 1422968

URL: http://svn.apache.org/viewvc?rev=1422968&view=rev
Log:
Merge -r 1422960:1422961 from branch-1 to branch-1.1. Fixes: MAPREDUCE-4859. TestRecoveryManager
fails on branch-1.

Modified:
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1422968&r1=1422967&r2=1422968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Mon Dec 17 15:03:55 2012
@@ -605,7 +605,15 @@ public class TaskTracker implements MRCo
         LOG.warn("Unknown job " + jobId + " being deleted.");
       } else {
         synchronized (rjob) {
-          rjob.tasks.remove(tip);
+          // Only remove the TIP if it is identical to the one that is finished
+          // Job recovery means that it is possible to have two task attempts
+          // with the same ID, which is used for TIP equals/hashcode.
+          for (TaskInProgress t : rjob.tasks) {
+            if (tip == t) {
+              rjob.tasks.remove(tip);
+              break;
+            }
+          }
         }
       }
     }

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1422968&r1=1422967&r2=1422968&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Mon Dec 17 15:03:55 2012
@@ -84,8 +84,7 @@ public class TestRecoveryManager {
    *  - restarts the jobtracker
    *  - checks if the jobtraker starts normally
    */
-  @Test
-  @Ignore
+  @Test(timeout=120000)
   public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
     LOG.info("Testing jobtracker restart with faulty job");
     String signalFile = new Path(TEST_DIR, "signal").toString();
@@ -111,7 +110,7 @@ public class TestRecoveryManager {
         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, 
         "test-recovery-manager", signalFile, signalFile);
     
-    // submit the faulty job
+    // submit another job
     RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
     LOG.info("Submitted job " + rJob2.getID());
     
@@ -129,7 +128,7 @@ public class TestRecoveryManager {
     Path jobFile = 
       new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
     LOG.info("Deleting job token file : " + jobFile.toString());
-    fs.delete(jobFile, false); // delete the job.xml file
+    Assert.assertTrue(fs.delete(jobFile, false)); // delete the job.xml file
     
     // create the job.xml file with 1 bytes
     FSDataOutputStream out = fs.create(jobFile);
@@ -142,12 +141,22 @@ public class TestRecoveryManager {
     // start the jobtracker
     LOG.info("Starting jobtracker");
     mr.startJobTracker();
-    ClusterStatus status = 
-      mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
+    JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+    ClusterStatus status = jobtracker.getClusterStatus(false);
     
     // check if the jobtracker came up or not
     Assert.assertEquals("JobTracker crashed!", 
                  JobTracker.State.RUNNING, status.getJobTrackerState());
+
+    // wait for job 2 to complete
+    JobInProgress jip = jobtracker.getJob(rJob2.getID());
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal"));
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
   }
   
   /**
@@ -156,8 +165,7 @@ public class TestRecoveryManager {
    *  - kills the jobtracker
    *  - checks if the jobtraker starts normally and job is recovered while 
    */
-  @Test
-  @Ignore
+  @Test(timeout=120000)
   public void testJobResubmission() throws Exception {
     LOG.info("Testing Job Resubmission");
     String signalFile = new Path(TEST_DIR, "signal").toString();
@@ -196,6 +204,8 @@ public class TestRecoveryManager {
     // assert that job is recovered by the jobtracker
     Assert.assertEquals("Resubmission failed ", 1, 
         jobtracker.getAllJobs().length);
+
+    // wait for job 1 to complete
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
@@ -218,8 +228,7 @@ public class TestRecoveryManager {
    *  - checks if the jobtraker starts normally and job#2 is recovered while 
    *    job#1 is failed.
    */
-  @Test
-  @Ignore
+  @Test(timeout=120000)
   public void testJobTrackerRestartWithBadJobs() throws Exception {
     LOG.info("Testing recovery-manager");
     String signalFile = new Path(TEST_DIR, "signal").toString();
@@ -234,7 +243,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
     
     UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the faulty job
@@ -252,7 +261,7 @@ public class TestRecoveryManager {
 
     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
     UtilsForTests.configureWaitingJobConf(job2, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
         "test-recovery-manager", signalFile1, signalFile1);
     
     // submit the job
@@ -273,7 +282,7 @@ public class TestRecoveryManager {
       UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
     
     UtilsForTests.configureWaitingJobConf(job3, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, 
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the job
@@ -326,6 +335,15 @@ public class TestRecoveryManager {
     
     status = jobtracker.getJobStatus(rJob3.getID());
     Assert.assertNull("Job should be missing because of ACL changed", status);
+
+    // wait for job 2 to complete
+    while (!jip.isComplete()) {
+      LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
+      // Signaling Map task to complete
+      fs.create(new Path(TEST_DIR, "signal1"));
+      UtilsForTests.waitFor(100);
+    }
+    Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
   }
   
   /**
@@ -341,7 +359,7 @@ public class TestRecoveryManager {
    *   - garble the jobtracker.info file and restart he jobtracker, the 
    *     jobtracker should crash.
    */
-  @Test
+  @Test(timeout=120000)
   public void testRestartCount() throws Exception {
     LOG.info("Testing Job Restart Count");
     String signalFile = new Path(TEST_DIR, "signal").toString();
@@ -356,7 +374,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
 
     UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output3"), 30, 0, "test-restart", signalFile,
+        new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
         signalFile);
 
     // submit the faulty job
@@ -396,7 +414,7 @@ public class TestRecoveryManager {
     JobConf job2 = mr.createJobConf();
 
     UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output7"), 50, 0, "test-restart-manager",
+        new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
         signalFile, signalFile);
 
     // submit a new job
@@ -430,7 +448,7 @@ public class TestRecoveryManager {
    * Test if the jobtracker waits for the info file to be created before 
    * starting.
    */
-  @Test
+  @Test(timeout=120000)
   public void testJobTrackerInfoCreation() throws Exception {
     LOG.info("Testing jobtracker.info file");
     MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);



Mime
View raw message