hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r888431 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Tue, 08 Dec 2009 15:14:40 GMT
Author: sharad
Date: Tue Dec  8 15:14:40 2009
New Revision: 888431

URL: http://svn.apache.org/viewvc?rev=888431&view=rev
Log:
MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. Contributed by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=888431&r1=888430&r2=888431&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Dec  8 15:14:40 2009
@@ -966,3 +966,6 @@
     MAPREDUCE-1075. Fix JobTracker to not throw an NPE for a non-existent
     queue. (V.V.Chaitanya Krishna via yhemanth)
 
+    MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. (Amar Kamat 
+    via sharad)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=888431&r1=888430&r2=888431&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Dec  8 15:14:40
2009
@@ -804,17 +804,21 @@
     private void removeHostCapacity(String hostName) {
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
+        int numTrackersOnHost = 0;
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
           int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
           int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
+          ++numTrackersOnHost;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        incrBlackListedTrackers(uniqueHostsMap.remove(hostName));
+        // remove the host
+        uniqueHostsMap.remove(hostName);
+        incrBlackListedTrackers(numTrackersOnHost);
       }
     }
     
@@ -2468,12 +2472,14 @@
         taskTrackers.remove(trackerName);
         Integer numTaskTrackersInHost = 
           uniqueHostsMap.get(oldStatus.getHost());
-        numTaskTrackersInHost --;
-        if (numTaskTrackersInHost > 0)  {
-          uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
-        }
-        else {
-          uniqueHostsMap.remove(oldStatus.getHost());
+        if (numTaskTrackersInHost != null) {
+          numTaskTrackersInHost --;
+          if (numTaskTrackersInHost > 0)  {
+            uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+          }
+          else {
+            uniqueHostsMap.remove(oldStatus.getHost());
+          }
         }
       }
     }
@@ -3841,8 +3847,8 @@
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
             for (TaskTracker tracker : trackers) {
-              LOG.info("Decommission: Losing tracker " + tracker + 
-                       " on host " + host);
+              LOG.info("Decommission: Losing tracker " 
+                       + tracker.getTrackerName() + " on host " + host);
               removeTracker(tracker);
             }
             trackersDecommissioned += trackers.size();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=888431&r1=888430&r2=888431&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Tue Dec  8 15:14:40 2009
@@ -63,8 +63,10 @@
     }
     @Override
     public ClusterStatus getClusterStatus(boolean detailed) {
-      return new ClusterStatus(trackers.length,
-          0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+      return new ClusterStatus(
+          taskTrackers().size() - getBlacklistedTrackerCount(),
+          getBlacklistedTrackerCount(), 0, 0, 0, totalSlots/2, totalSlots/2, 
+           JobTracker.State.RUNNING, 0);
     }
 
     public void setNumSlots(int totalSlots) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=888431&r1=888430&r2=888431&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Tue
Dec  8 15:14:40 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
@@ -47,6 +48,7 @@
     conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
     conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000);
+    conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1");
     jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
     jobTracker.startExpireTrackersThread();
   }
@@ -91,4 +93,139 @@
     job.finishTask(tid[1]);
     
   }
+  
+  /**
+   * Test whether the tracker gets blacklisted after its lost.
+   */
+  public void testLostTrackerBeforeBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    TaskAttemptID[] tid = new TaskAttemptID[3];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+
+    job.finishTask(tid[0]);
+
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // lose the tracker
+    clock.advance(1100);
+    jobTracker.checkExpiredTrackers();
+    assertFalse("Tracker 0 not lost", 
+        jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                  .contains(trackers[0]));
+    
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 0, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // Tracker 1 establishes contact with JT 
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    
+    // Tracker1 should get assigned the lost map task
+    tid[1] =  job.findMapTask(trackers[1]);
+
+    assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
+    
+    assertEquals("Task ID of reassigned map task does not match",
+        tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
+    
+    // finish the map task
+    job.finishTask(tid[1]);
+
+    // finish the reduce task
+    tid[2] =  job.findReduceTask(trackers[1]);
+    job.finishTask(tid[2]);
+    
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the tracker is lost
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count .. since we lost one blacklisted tracker
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+  }
+
+  /**
+   * Test whether the tracker gets lost after its blacklisted.
+   */
+  public void testLostTrackerAfterBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    clock.advance(600);
+    TaskAttemptID[] tid = new TaskAttemptID[2];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+    // Fail the task
+    job.failTask(tid[0]);
+    
+    // Tracker 1 establishes contact with JT
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    
+    // Tracker 1 gets the map task
+    tid[1] = job.findMapTask(trackers[1]);
+    // Finish the task and also the job
+    job.finishTask(tid[1]);
+
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the trackers 1 got blacklisted
+    assertTrue("Tracker 0 not blacklisted", 
+               jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName()
+                 .equals(trackers[0]));
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                1, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+    // Advance clock. Tracker 0 should be lost
+    clock.advance(500);
+    jobTracker.checkExpiredTrackers();
+    
+    // check if the task tracker is lost
+    assertFalse("Tracker 0 not lost", 
+            jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                      .contains(trackers[0]));
+    
+    // check if the lost tracker has removed from the jobtracker
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=888431&r1=888430&r2=888431&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue
Dec  8 15:14:40 2009
@@ -32,7 +32,11 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -377,4 +381,88 @@
     
     stopCluster();
   }
+  
+  // Mapper that fails once for the first time
+  static class FailOnceMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    private boolean shouldFail = false;
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      if (shouldFail) {
+        throw new RuntimeException("failing map");
+      }
+    }
+    
+    @Override
+    public void configure(JobConf conf) {
+      TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id"));
+      shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0; 
+    }
+  }
+  
+  /**
+   * Check refreshNodes for decommissioning blacklisted nodes. 
+   */
+  public void testBlacklistedNodeDecommissioning() throws Exception {
+    LOG.info("Testing blacklisted node decommissioning");
+
+    Configuration conf = new Configuration();
+    conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1");
+
+    startCluster(2, 1, 0, conf);
+    
+    assertEquals("Trackers not up", 2,
+           mr.getJobTrackerRunner().getJobTracker().getActiveTrackers().length);
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 2, jt.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jt.getClusterStatus(false).getBlacklistedTrackers());
+
+    // run a failing job to blacklist the tracker
+    JobConf jConf = mr.createJobConf();
+    jConf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    jConf.setJobName("test-job-fail-once");
+    jConf.setMapperClass(FailOnceMapper.class);
+    jConf.setReducerClass(IdentityReducer.class);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    
+    RunningJob job = 
+      UtilsForTests.runJob(jConf, new Path("in"), new Path("out"));
+    job.waitForCompletion();
+    
+    // check if the tracker is lost
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                1, jt.getClusterStatus(false).getBlacklistedTrackers());
+    
+    // find the tracker to decommission
+    String hostToDecommission = 
+      JobInProgress.convertTrackerNameToHostName(
+          jt.getBlacklistedTrackers()[0].getTaskTrackerName());
+    LOG.info("Decommissioning host " + hostToDecommission);
+    
+    Set<String> decom = new HashSet<String>(1);
+    decom.add(hostToDecommission);
+    jt.decommissionNodes(decom);
+ 
+    // check the cluster status and tracker size
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Blacklisted tracker count incorrect in cluster status after "
+                 + "decommissioning", 
+                 0, jt.getClusterStatus(false).getBlacklistedTrackers());
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.taskTrackers().size());
+    
+    stopCluster();
+  }
 }



Mime
View raw message