hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r888440 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Date Tue, 08 Dec 2009 15:20:57 GMT
Author: sharad
Date: Tue Dec  8 15:20:57 2009
New Revision: 888440

URL: http://svn.apache.org/viewvc?rev=888440&view=rev
Log:
MAPREDUCE-754. Merge revision 888431 from trunk.

Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=888440&r1=888439&r2=888440&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue Dec  8 15:20:57 2009
@@ -856,3 +856,6 @@
     MAPREDUCE-1075. Fix JobTracker to not throw an NPE for a non-existent
     queue. (V.V.Chaitanya Krishna via yhemanth)
 
+    MAPREDUCE-754. Fix NPE in expiry thread when a TT is lost. (Amar Kamat 
+    via sharad)
+

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=888440&r1=888439&r2=888440&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
Tue Dec  8 15:20:57 2009
@@ -411,14 +411,7 @@
               if ((now - newProfile.getLastSeen()) >
                   tasktrackerExpiryInterval) {
                 // Remove completely after marking the tasks as 'KILLED'
-                lostTaskTracker(current);
-                // tracker is lost, and if it is blacklisted, remove 
-                // it from the count of blacklisted trackers in the cluster
-                if (isBlacklisted(trackerName)) {
-                  faultyTrackers.numBlacklistedTrackers -= 1;
-                }
-                updateTaskTrackerStatus(trackerName, null);
-                statistics.taskTrackerRemoved(trackerName);
+                removeTracker(current);
                 // remove the mapping from the hosts list
                 String hostname = newProfile.getHost();
                 hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -789,17 +782,21 @@
     private void removeHostCapacity(String hostName) {
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
+        int numTrackersOnHost = 0;
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
           int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
           int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
+          ++numTrackersOnHost;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
           getInstrumentation().addBlackListedReduceSlots(
               reduceSlots);
         }
-        numBlacklistedTrackers += uniqueHostsMap.remove(hostName);
+        // remove the host
+        uniqueHostsMap.remove(hostName);
+        numBlacklistedTrackers += numTrackersOnHost;
       }
     }
     
@@ -2413,12 +2410,14 @@
         taskTrackers.remove(trackerName);
         Integer numTaskTrackersInHost = 
           uniqueHostsMap.get(oldStatus.getHost());
-        numTaskTrackersInHost --;
-        if (numTaskTrackersInHost > 0)  {
-          uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
-        }
-        else {
-          uniqueHostsMap.remove(oldStatus.getHost());
+        if (numTaskTrackersInHost != null) {
+          numTaskTrackersInHost --;
+          if (numTaskTrackersInHost > 0)  {
+            uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
+          }
+          else {
+            uniqueHostsMap.remove(oldStatus.getHost());
+          }
         }
       }
     }
@@ -3748,7 +3747,7 @@
   }
 
   // main decommission
-  private synchronized void decommissionNodes(Set<String> hosts) 
+  synchronized void decommissionNodes(Set<String> hosts) 
   throws IOException {  
     LOG.info("Decommissioning " + hosts.size() + " nodes");
     // create a list of tracker hostnames
@@ -3759,10 +3758,9 @@
           Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
             for (TaskTracker tracker : trackers) {
-              LOG.info("Decommission: Losing tracker " + tracker + 
-                       " on host " + host);
-              lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
+              LOG.info("Decommission: Losing tracker "
+                       + tracker.getTrackerName() + " on host " + host);
+              removeTracker(tracker);
             }
           }
           LOG.info("Host " + host + " is ready for decommissioning");
@@ -3771,6 +3769,19 @@
     }
   }
 
+  private void removeTracker(TaskTracker tracker) {
+    String trackerName = tracker.getTrackerName();
+    // Remove completely after marking the tasks as 'KILLED'
+    lostTaskTracker(tracker);
+    // tracker is lost, and if it is blacklisted, remove 
+    // it from the count of blacklisted trackers in the cluster
+    if (isBlacklisted(trackerName)) {
+      faultyTrackers.numBlacklistedTrackers -= 1;
+    }
+    updateTaskTrackerStatus(trackerName, null);
+    statistics.taskTrackerRemoved(trackerName);
+  }
+  
   /**
    * Returns a set of excluded nodes.
    */

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=888440&r1=888439&r2=888440&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Tue Dec  8 15:20:57 2009
@@ -63,8 +63,10 @@
     }
     @Override
     public ClusterStatus getClusterStatus(boolean detailed) {
-      return new ClusterStatus(trackers.length,
-          0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+      return new ClusterStatus(
+          taskTrackers().size() - getBlacklistedTrackerCount(),
+          getBlacklistedTrackerCount(), 0, 0, 0, totalSlots/2, totalSlots/2, 
+           JobTracker.State.RUNNING, 0);
     }
 
     public void setNumSlots(int totalSlots) {

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=888440&r1=888439&r2=888440&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
Tue Dec  8 15:20:57 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
@@ -47,6 +48,7 @@
     conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
     conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000);
+    conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1");
     jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
     jobTracker.startExpireTrackersThread();
   }
@@ -91,4 +93,139 @@
     job.finishTask(tid[1]);
     
   }
+  
+  /**
+   * Test whether the tracker gets blacklisted after its lost.
+   */
+  public void testLostTrackerBeforeBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    TaskAttemptID[] tid = new TaskAttemptID[3];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+
+    job.finishTask(tid[0]);
+
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // lose the tracker
+    clock.advance(1100);
+    jobTracker.checkExpiredTrackers();
+    assertFalse("Tracker 0 not lost", 
+        jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                  .contains(trackers[0]));
+    
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 0, jobTracker.getClusterStatus(false).getTaskTrackers());
+    
+    // Tracker 1 establishes contact with JT 
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    
+    // Tracker1 should get assigned the lost map task
+    tid[1] =  job.findMapTask(trackers[1]);
+
+    assertNotNull("Map Task from Lost Tracker did not get reassigned", tid[1]);
+    
+    assertEquals("Task ID of reassigned map task does not match",
+        tid[0].getTaskID().toString(), tid[1].getTaskID().toString());
+    
+    // finish the map task
+    job.finishTask(tid[1]);
+
+    // finish the reduce task
+    tid[2] =  job.findReduceTask(trackers[1]);
+    job.finishTask(tid[2]);
+    
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the tracker is lost
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count .. since we lost one blacklisted tracker
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+  }
+
+  /**
+   * Test whether the tracker gets lost after its blacklisted.
+   */
+  public void testLostTrackerAfterBlacklisting() throws Exception {
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[0]);
+    clock.advance(600);
+    TaskAttemptID[] tid = new TaskAttemptID[2];
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    job.setClusterSize(4);
+    
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    
+    // Tracker 0 gets the map task
+    tid[0] = job.findMapTask(trackers[0]);
+    // Fail the task
+    job.failTask(tid[0]);
+    
+    // Tracker 1 establishes contact with JT
+    FakeObjectUtilities.establishFirstContact(jobTracker, trackers[1]);
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    
+    // Tracker 1 gets the map task
+    tid[1] = job.findMapTask(trackers[1]);
+    // Finish the task and also the job
+    job.finishTask(tid[1]);
+
+    // check if job is successful
+    assertEquals("Job not successful", 
+                 JobStatus.SUCCEEDED, job.getStatus().getRunState());
+    
+    // check if the trackers 1 got blacklisted
+    assertTrue("Tracker 0 not blacklisted", 
+               jobTracker.getBlacklistedTrackers()[0].getTaskTrackerName()
+                 .equals(trackers[0]));
+    // check if the tracker count is correct
+    assertEquals("Active tracker count mismatch", 
+                 2, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                1, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+    // Advance clock. Tracker 0 should be lost
+    clock.advance(500);
+    jobTracker.checkExpiredTrackers();
+    
+    // check if the task tracker is lost
+    assertFalse("Tracker 0 not lost", 
+            jobTracker.getClusterStatus(false).getActiveTrackerNames()
+                      .contains(trackers[0]));
+    
+    // check if the lost tracker has removed from the jobtracker
+    assertEquals("Active tracker count mismatch", 
+                 1, jobTracker.taskTrackers().size());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jobTracker.getClusterStatus(false).getBlacklistedTrackers());
+    
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=888440&r1=888439&r2=888440&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
Tue Dec  8 15:20:57 2009
@@ -32,7 +32,11 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -377,4 +381,88 @@
     
     stopCluster();
   }
+  
+  // Mapper that fails once for the first time
+  static class FailOnceMapper extends MapReduceBase implements
+      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+    private boolean shouldFail = false;
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+
+      if (shouldFail) {
+        throw new RuntimeException("failing map");
+      }
+    }
+    
+    @Override
+    public void configure(JobConf conf) {
+      TaskAttemptID id = TaskAttemptID.forName(conf.get("mapred.task.id"));
+      shouldFail = id.getId() == 0 && id.getTaskID().getId() == 0; 
+    }
+  }
+  
+  /**
+   * Check refreshNodes for decommissioning blacklisted nodes. 
+   */
+  public void testBlacklistedNodeDecommissioning() throws Exception {
+    LOG.info("Testing blacklisted node decommissioning");
+
+    Configuration conf = new Configuration();
+    conf.set(JTConfig.JT_MAX_TRACKER_BLACKLISTS, "1");
+
+    startCluster(2, 1, 0, conf);
+    
+    assertEquals("Trackers not up", 2,
+           mr.getJobTrackerRunner().getJobTracker().getActiveTrackers().length);
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 2, jt.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                0, jt.getClusterStatus(false).getBlacklistedTrackers());
+
+    // run a failing job to blacklist the tracker
+    JobConf jConf = mr.createJobConf();
+    jConf.set(JobContext.MAX_TASK_FAILURES_PER_TRACKER, "1");
+    jConf.setJobName("test-job-fail-once");
+    jConf.setMapperClass(FailOnceMapper.class);
+    jConf.setReducerClass(IdentityReducer.class);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    
+    RunningJob job = 
+      UtilsForTests.runJob(jConf, new Path("in"), new Path("out"));
+    job.waitForCompletion();
+    
+    // check if the tracker is lost
+    // validate the total tracker count
+    assertEquals("Active tracker count mismatch", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    // validate blacklisted count
+    assertEquals("Blacklisted tracker count mismatch", 
+                1, jt.getClusterStatus(false).getBlacklistedTrackers());
+    
+    // find the tracker to decommission
+    String hostToDecommission = 
+      JobInProgress.convertTrackerNameToHostName(
+          jt.getBlacklistedTrackers()[0].getTaskTrackerName());
+    LOG.info("Decommissioning host " + hostToDecommission);
+    
+    Set<String> decom = new HashSet<String>(1);
+    decom.add(hostToDecommission);
+    jt.decommissionNodes(decom);
+ 
+    // check the cluster status and tracker size
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.getClusterStatus(false).getTaskTrackers());
+    assertEquals("Blacklisted tracker count incorrect in cluster status after "
+                 + "decommissioning", 
+                 0, jt.getClusterStatus(false).getBlacklistedTrackers());
+    assertEquals("Tracker is not lost upon host decommissioning", 
+                 1, jt.taskTrackers().size());
+    
+    stopCluster();
+  }
 }



Mime
View raw message