hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r792613 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Thu, 09 Jul 2009 17:19:08 GMT
Author: yhemanth
Date: Thu Jul  9 17:19:07 2009
New Revision: 792613

URL: http://svn.apache.org/viewvc?rev=792613&view=rev
Log:
MAPREDUCE-733. Fix a RuntimeException while unreserving trackers that are blacklisted for
a job. Contributed by Arun Murthy and Sreekanth Ramakrishnan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792613&r1=792612&r2=792613&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  9 17:19:07 2009
@@ -169,3 +169,7 @@
     unused reservations for a job when it completes.
     (Arun Murthy and Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-733. Fix a RuntimeException while unreserving trackers
+    that are blacklisted for a job.
+    (Arun Murthy and Sreekanth Ramakrishnan via yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=792613&r1=792612&r2=792613&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Jul  9
17:19:07 2009
@@ -1562,7 +1562,8 @@
    * 
    * @param taskTracker task-tracker on which a task failed
    */
-  void addTrackerTaskFailure(String trackerName, TaskTracker taskTracker) {
+  synchronized void addTrackerTaskFailure(String trackerName, 
+                                          TaskTracker taskTracker) {
     if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
       String trackerHostName = convertTrackerNameToHostName(trackerName);
 
@@ -1578,8 +1579,12 @@
         
         // Cancel reservations if appropriate
         if (taskTracker != null) {
-          taskTracker.unreserveSlots(TaskType.MAP, this);
-          taskTracker.unreserveSlots(TaskType.REDUCE, this);
+          if (trackersReservedForMaps.containsKey(taskTracker)) {
+            taskTracker.unreserveSlots(TaskType.MAP, this);
+          }
+          if (trackersReservedForReduces.containsKey(taskTracker)) {
+            taskTracker.unreserveSlots(TaskType.REDUCE, this);
+          }
         }
         LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
       }
@@ -3226,4 +3231,11 @@
       return Values.REDUCE.name();
     }
   }
+  
+  /**
+   * Test method to set the cluster sizes
+   */
+  void setClusterSize(int clusterSize) {
+    this.clusterSize = clusterSize;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=792613&r1=792612&r2=792613&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Thu Jul  9 17:19:07 2009
@@ -87,6 +87,7 @@
                                         jobtracker, getJobConf(), this, 1);
         nonRunningReduces.add(reduces[i]);
       }
+      tasksInited.set(true);
     }
     
     private TaskAttemptID findTask(String trackerName, String trackerHost,
@@ -165,6 +166,22 @@
           tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
       updateTaskStatus(tip, status);
     }
+    
+    public void failTask(TaskAttemptID taskId) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+          1.0f, 1, TaskStatus.State.FAILED, "", "", tip
+              .machineWhereTaskRan(taskId), tip.isMapTask() ? Phase.MAP
+              : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+    
+    public void cleanUpMetrics() {
+    }
+    
+    public void setClusterSize(int clusterSize) {
+      super.setClusterSize(clusterSize);
+    }
   }
   
   static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=792613&r1=792612&r2=792613&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
Thu Jul  9 17:19:07 2009
@@ -106,12 +106,8 @@
     }
 
     public void failTask(TaskAttemptID taskId) {
+      super.failTask(taskId);
       TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
-      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
-          1.0f, 1, TaskStatus.State.FAILED, "", "", tip
-              .machineWhereTaskRan(taskId), tip.isMapTask() ? Phase.MAP
-              : Phase.REDUCE, new Counters());
-      updateTaskStatus(tip, status);
       addFailuresToTrackers(tip.machineWhereTaskRan(taskId));
     }
 
@@ -178,7 +174,7 @@
   }
 
   public void AtestTrackerBlacklistingForJobFailures() throws Exception {
-    runBlackListingJob();
+    runBlackListingJob(jobTracker, trackers);
     assertEquals("Tracker 1 not blacklisted", jobTracker
         .getBlacklistedTrackerCount(), 1);
     checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
@@ -207,7 +203,7 @@
   }
 
   public void testBlackListingWithFailuresAndHealthStatus() throws Exception {
-    runBlackListingJob();
+    runBlackListingJob(jobTracker, trackers);
     assertEquals("Tracker 1 not blacklisted", jobTracker
         .getBlacklistedTrackerCount(), 1);
     checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
@@ -278,28 +274,49 @@
     }
   }
 
-  private void runBlackListingJob() throws IOException, Exception {
+  /**
+   * Runs a job which blacklists the first of the tracker
+   * which is passed to the method.
+   * 
+   * @param jobTracker JobTracker instance
+   * @param trackers array of trackers, the method would blacklist
+   * first element of the arry
+   * @return A job in progress object.
+   * @throws Exception
+   */
+  static FakeJobInProgress runBlackListingJob(JobTracker jobTracker,
+      String[] trackers) throws Exception {
     TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
     JobConf conf = new JobConf();
     conf.setSpeculativeExecution(false);
     conf.setNumMapTasks(0);
     conf.setNumReduceTasks(5);
     conf.set("mapred.max.reduce.failures.percent", ".70");
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
     conf.setMaxTaskFailuresPerTracker(1);
     FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.setClusterSize(trackers.length);
     job.initTasks();
 
     taskAttemptID[0] = job.findReduceTask(trackers[0]);
     taskAttemptID[1] = job.findReduceTask(trackers[1]);
     taskAttemptID[2] = job.findReduceTask(trackers[2]);
     job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
     job.failTask(taskAttemptID[0]);
+
     taskAttemptID[0] = job.findReduceTask(trackers[0]);
     job.failTask(taskAttemptID[0]);
+
     taskAttemptID[0] = job.findReduceTask(trackers[1]);
-    job.finishTask(taskAttemptID[2]);
     job.finishTask(taskAttemptID[0]);
+    taskAttemptID[0] = job.findReduceTask(trackers[1]);
+    taskAttemptID[1] = job.findReduceTask(trackers[2]);
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+
     jobTracker.finalizeJob(job);
+    return job;
   }
 
   private void checkReasonForBlackListing(String host,

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=792613&r1=792612&r2=792613&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
Thu Jul  9 17:19:07 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -48,25 +49,6 @@
     }
   }
 
-  private static class FakeJobInProgress extends
-      org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress {
-    TaskInProgress cleanup[] = new TaskInProgress[0];
-    TaskInProgress setup[] = new TaskInProgress[0];
-
-    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
-      super(jobConf, tracker);
-    }
-
-    @Override
-    public synchronized void initTasks() throws IOException {
-      super.initTasks();
-      tasksInited.set(true);
-    }
-
-    @Override
-    public void cleanUpMetrics() {
-    }
-  }
 
   public static Test suite() {
     TestSetup setup = new TestSetup(new TestSuite(TestTrackerReservation.class)) {
@@ -111,9 +93,9 @@
         "mapred.committer.job.setup.cleanup.needed", false);
     
     //Set task tracker objects for reservation.
-    TaskTracker tt1 = new TaskTracker(trackers[0]);
-    TaskTracker tt2 = new TaskTracker(trackers[1]);
-    TaskTracker tt3 = new TaskTracker(trackers[2]);
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    TaskTracker tt3 = jobTracker.getTaskTracker(trackers[2]);
     TaskTrackerStatus status1 = new TaskTrackerStatus(
         trackers[0],JobInProgress.convertTrackerNameToHostName(
             trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
@@ -128,6 +110,7 @@
     tt3.setStatus(status3);
     
     FakeJobInProgress fjob = new FakeJobInProgress(conf, jobTracker);
+    fjob.setClusterSize(3);
     fjob.initTasks();
     
     tt1.reserveSlots(TaskType.MAP, fjob, 2);
@@ -154,4 +137,113 @@
     assertEquals("Reservation for the job not released : Reduces", 
         0, fjob.getNumReservedTaskTrackersForReduces());
   }
+  
+  /**
+   * Test case to check task tracker reservation for a job which 
+   * has a job blacklisted tracker.
+   * <ol>
+   * <li>Run a job which fails on one of the tracker.</li>
+   * <li>Check if the job succeeds and has no reservation.</li>
+   * </ol>
+   * 
+   * @throws Exception
+   */
+  
+  public void testTrackerReservationWithJobBlackListedTracker() throws Exception {
+    FakeJobInProgress job = TestTaskTrackerBlacklisting.runBlackListingJob(
+        jobTracker, trackers);
+    assertEquals("Job has no blacklisted trackers", 1, job
+        .getBlackListedTrackers().size());
+    assertTrue("Tracker 1 not blacklisted for the job", job
+        .getBlackListedTrackers().contains(
+            JobInProgress.convertTrackerNameToHostName(trackers[0])));
+    assertEquals("Job didnt complete successfully complete", job.getStatus()
+        .getRunState(), JobStatus.SUCCEEDED);
+    assertEquals("Reservation for the job not released: Maps", 
+        0, job.getNumReservedTaskTrackersForMaps());
+    assertEquals("Reservation for the job not released : Reduces", 
+        0, job.getNumReservedTaskTrackersForReduces());
+  }
+  
+  /**
+   * Test case to check if the job reservation is handled properly if the 
+   * job has a reservation on a black listed tracker.
+   * 
+   * @throws Exception
+   */
+  public void testReservationOnBlacklistedTracker() throws Exception {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setNumMapTasks(2);
+    conf.setNumReduceTasks(2);
+    conf.set("mapred.max.reduce.failures.percent", ".70");
+    conf.set("mapred.max.map.failures.percent", ".70");
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+    conf.setMaxTaskFailuresPerTracker(1);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.setClusterSize(trackers.length);
+    job.initTasks();
+    
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    TaskTracker tt3 = jobTracker.getTaskTracker(trackers[2]);
+    TaskTrackerStatus status1 = new TaskTrackerStatus(
+        trackers[0],JobInProgress.convertTrackerNameToHostName(
+            trackers[0]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status2 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    TaskTrackerStatus status3 = new TaskTrackerStatus(
+        trackers[1],JobInProgress.convertTrackerNameToHostName(
+            trackers[1]),0,new ArrayList<TaskStatus>(), 0, 2, 2);
+    tt1.setStatus(status1);
+    tt2.setStatus(status2);
+    tt3.setStatus(status3);
+    
+    tt1.reserveSlots(TaskType.MAP, job, 2);
+    tt1.reserveSlots(TaskType.REDUCE, job, 2);
+    tt3.reserveSlots(TaskType.MAP, job, 2);
+    tt3.reserveSlots(TaskType.REDUCE, job, 2);
+    
+    assertEquals("Trackers not reserved for the job : maps", 
+        2, job.getNumReservedTaskTrackersForMaps());
+    assertEquals("Trackers not reserved for the job : reduces", 
+        2, job.getNumReservedTaskTrackersForReduces());
+  
+    /*
+     * FakeJobInProgress.findMapTask does not handle
+     * task failures. So working around it by failing
+     * reduce and blacklisting tracker.
+     * Then finish the map task later. 
+     */
+    TaskAttemptID mTid = job.findMapTask(trackers[0]);
+    TaskAttemptID rTid = job.findReduceTask(trackers[0]);
+    //Task should blacklist the tasktracker.
+    job.failTask(rTid);
+    
+    assertEquals("Tracker 0 not blacklisted for the job", 1, 
+        job.getBlackListedTrackers().size());
+    assertEquals("Extra Trackers reserved for the job : maps", 
+        1, job.getNumReservedTaskTrackersForMaps());
+    assertEquals("Extra Trackers reserved for the job : reduces", 
+        1, job.getNumReservedTaskTrackersForReduces());
+    //Finish the map task on the tracker 1. Finishing it here to work
+    //around bug in the FakeJobInProgress object
+    job.finishTask(mTid);
+    mTid = job.findMapTask(trackers[1]);
+    rTid = job.findReduceTask(trackers[1]);
+    job.finishTask(mTid);
+    job.finishTask(rTid);
+    rTid = job.findReduceTask(trackers[1]);
+    job.finishTask(rTid);
+    assertEquals("Job didnt complete successfully complete", job.getStatus()
+        .getRunState(), JobStatus.SUCCEEDED);
+    assertEquals("Trackers not unreserved for the job : maps", 
+        0, job.getNumReservedTaskTrackersForMaps());
+    assertEquals("Trackers not unreserved for the job : reduces", 
+        0, job.getNumReservedTaskTrackersForReduces());
+    
+  }
 }
+  
\ No newline at end of file



Mime
View raw message