hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r789733 - in /hadoop/mapreduce/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/
Date Tue, 30 Jun 2009 13:59:45 GMT
Author: yhemanth
Date: Tue Jun 30 13:59:45 2009
New Revision: 789733

URL: http://svn.apache.org/viewvc?rev=789733&view=rev
Log:
MAPREDUCE-211. Provides ability to run a health check script on the tasktracker nodes and blacklist nodes if they are unhealthy. Contributed by Sreekanth Ramakrishnan.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/mapreduce/trunk/src/webapps/job/machines.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jun 30 13:59:45 2009
@@ -21,6 +21,10 @@
     MAPREDUCE-567. Add a new example MR that always fails. (Philip Zeyliger
     via tomwhite)
 
+    MAPREDUCE-211. Provides ability to run a health check script on the
+    tasktracker nodes and blacklist nodes if they are unhealthy.
+    (Sreekanth Ramakrishnan via yhemanth)
+
   IMPROVEMENTS
 
     HADOOP-5967. Sqoop should only use a single map task. (Aaron Kimball via

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Tue Jun 30 13:59:45 2009
@@ -729,6 +729,63 @@
             </section>
             
           </section>
+          <section>
+            <title>Monitoring Health of TaskTracker Nodes</title>
+            <p>Hadoop Map/Reduce provides a mechanism by which administrators 
+            can configure the TaskTracker to run an administrator supplied
+            script periodically to determine if a node is healthy or not.
+            Administrators can determine if the node is in a healthy state
+            by performing any checks of their choice in the script. If the
+            script detects the node to be in an unhealthy state, it must print
+            a line to standard output beginning with the string <em>ERROR</em>.
+            The TaskTracker spawns the script periodically and checks its 
+            output. If the script's output contains the string <em>ERROR</em>,
+            as described above, the node's status is reported as 'unhealthy'
+            and the node is black-listed on the JobTracker. No further tasks 
+            will be assigned to this node. However, the
+            TaskTracker continues to run the script, so that if the node
+            becomes healthy again, it will be removed from the blacklisted
+            nodes on the JobTracker automatically. The node's health
+            along with the output of the script, if it is unhealthy, is
+            available to the administrator in the JobTracker's web interface.
+            The time since the node was healthy is also displayed on the 
+            web interface.
+            </p>
+            
+            <section>
+            <title>Configuring the Node Health Check Script</title>
+            <p>The following parameters can be used to control the node health 
+            monitoring script in <em>mapred-site.xml</em>.</p>
+            <table>
+            <tr><th>Name</th><th>Description</th></tr>
+            <tr><td><code>mapred.healthChecker.script.path</code></td>
+            <td>Absolute path to the script which is periodically run by the 
+            TaskTracker to determine if the node is 
+            healthy or not. The file should be executable by the TaskTracker.
+            If the value of this key is empty or the file does 
+            not exist or is not executable, node health monitoring
+            is not started.</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.interval</code></td>
+            <td>Frequency at which the node health script is run, 
+            in milliseconds</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.script.timeout</code></td>
+            <td>Time after which the node health script will be killed by
+            the TaskTracker if unresponsive.
+            The node is marked unhealthy. if node health script times out.</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.script.args</code></td>
+            <td>Extra arguments that can be passed to the node health script 
+            when launched.
+            These should be comma separated list of arguments. </td>
+            </tr>
+            </table>
+            </section>
+          </section>
           
         </section>
         

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Tue Jun 30 13:59:45 2009
@@ -1011,4 +1011,40 @@
     of -1 signifies no limit.</description>
 </property>
 
+<!--  Node health script variables -->
+
+<property>
+  <name>mapred.healthChecker.script.path</name>
+  <value></value>
+  <description>Absolute path to the script which is
+  periodicallyrun by the node health monitoring service to determine if
+  the node is healthy or not. If the value of this key is empty or the
+  file does not exist in the location configured here, the node health
+  monitoring service is not started.</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.interval</name>
+  <value>60000</value>
+  <description>Frequency of the node health script to be run,
+  in milliseconds</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.script.timeout</name>
+  <value>600000</value>
+  <description>Time after node health script should be killed if 
+  unresponsive and considered that the script has failed.</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.script.args</name>
+  <value></value>
+  <description>List of arguments which are to be passed to 
+  node health script when it is being launched comma seperated.
+  </description>
+</property>
+
+<!--  end of node health script variables -->
+
 </configuration>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Jun 30 13:59:45 2009
@@ -63,8 +63,9 @@
    * Version 25: JobIDs are passed in response to JobTracker restart 
    * Version 26: Modified TaskID to be aware of the new TaskTypes
    * Version 27: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+   * Version 28: Adding node health status to TaskStatus for MAPREDUCE-211
    */
-  public static final long versionID = 27L;
+  public static final long versionID = 28L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jun 30 13:59:45 2009
@@ -64,6 +64,7 @@
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -479,18 +480,28 @@
     }
   }
   
+  enum ReasonForBlackListing {
+    EXCEEDING_FAILURES,
+    NODE_UNHEALTHY
+  }
+  
   // The FaultInfo which indicates the number of faults of a tracker
   // and when the last fault occurred
   // and whether the tracker is blacklisted across all jobs or not
   private static class FaultInfo {
+    static final String FAULT_FORMAT_STRING =  "%d failures on the tracker";
     int numFaults = 0;
     long lastUpdated;
     boolean blacklisted; 
 
+    private boolean isHealthy;
+    private HashMap<ReasonForBlackListing, String>rfbMap;
+
     FaultInfo(long time) {
       numFaults = 0;
       lastUpdated = time;
       blacklisted = false;
+      rfbMap = new  HashMap<ReasonForBlackListing, String>();
     }
 
     void setFaultCount(int num) {
@@ -513,9 +524,47 @@
       return blacklisted;
     }
     
-    void setBlacklist(boolean blacklist) {
-      blacklisted = blacklist;
+    void setBlacklist(ReasonForBlackListing rfb, 
+        String trackerFaultReport) {
+      blacklisted = true;
+      this.rfbMap.put(rfb, trackerFaultReport);
+    }
+
+    public void setHealthy(boolean isHealthy) {
+      this.isHealthy = isHealthy;
+    }
+
+    public boolean isHealthy() {
+      return isHealthy;
+    }
+    
+    public String getTrackerFaultReport() {
+      StringBuffer sb = new StringBuffer();
+      for(String reasons : rfbMap.values()) {
+        sb.append(reasons);
+        sb.append("\n");
+      }
+      return sb.toString();
+    }
+    
+    Set<ReasonForBlackListing> getReasonforblacklisting() {
+      return this.rfbMap.keySet();
+    }
+    
+    public void unBlacklist() {
+      this.blacklisted = false;
+      this.rfbMap.clear();
+    }
+
+    public boolean removeBlackListedReason(ReasonForBlackListing rfb) {
+      String str = rfbMap.remove(rfb);
+      return str!=null;
+    }
+
+    public void addBlackListedReason(ReasonForBlackListing rfb, String reason) {
+      this.rfbMap.put(rfb, reason);
     }
+    
   }
 
   private class FaultyTrackersInfo {
@@ -538,27 +587,85 @@
      */
     void incrementFaults(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
-        FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        FaultInfo fi = getFaultInfo(hostName, true);
         long now = clock.getTime();
-        if (fi == null) {
-          fi = new FaultInfo(now);
-          potentiallyFaultyTrackers.put(hostName, fi);
-        }
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
         fi.setLastUpdated(now);
-        if (!fi.isBlacklisted()) {
-          if (shouldBlacklist(hostName, numFaults)) {
-            LOG.info("Adding " + hostName + " to the blacklist" +
-                     " across all jobs");
-            removeHostCapacity(hostName);
-            fi.setBlacklist(true);
-          }
+        if (exceedsFaults(fi)) {
+          LOG.info("Adding " + hostName + " to the blacklist"
+              + " across all jobs");
+          String reason = String.format(FaultInfo.FAULT_FORMAT_STRING,
+              numFaults);
+          blackListTracker(hostName, reason,
+              ReasonForBlackListing.EXCEEDING_FAILURES);
         }
       }        
     }
 
+    
+    private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
+      FaultInfo fi = getFaultInfo(hostName, true);
+      boolean blackListed = fi.isBlacklisted();
+      if(blackListed) {
+        if(fi.getReasonforblacklisting().contains(rfb)) {
+          return;
+        } else {
+          LOG.info("Adding blacklisted reason for tracker : " + hostName 
+              + " Reason for blacklisting is : " + rfb);
+          fi.addBlackListedReason(rfb, reason);
+        }
+        return;
+      } else {
+        LOG.info("Blacklisting tracker : " + hostName 
+            + " Reason for blacklisting is : " + rfb);
+        removeHostCapacity(hostName);
+        fi.setBlacklist(rfb, reason);
+      }
+    }
+    
+    private boolean canUnBlackListTracker(String hostName,
+        ReasonForBlackListing rfb) {
+      FaultInfo fi = getFaultInfo(hostName, false);
+      if(fi == null) {
+        return false;
+      }
+      
+      Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting();
+      return fi.isBlacklisted() && rfbSet.contains(rfb);
+    }
+
+    private void unBlackListTracker(String hostName,
+        ReasonForBlackListing rfb) {
+      // check if you can black list the tracker then call this methods
+      FaultInfo fi = getFaultInfo(hostName, false);
+      if(fi.removeBlackListedReason(rfb)) {
+        if(fi.getReasonforblacklisting().isEmpty()) {
+          addHostCapacity(hostName);
+          LOG.info("Unblacklisting tracker : " + hostName);
+          fi.unBlacklist();
+          //We have unBlackListed tracker, so tracker should
+          //definitely be healthy. Check fault count if fault count
+          //is zero don't keep it memory.
+          if(fi.numFaults == 0) {
+            potentiallyFaultyTrackers.remove(hostName);
+          }
+        }
+      }
+    }
+    
+    private FaultInfo getFaultInfo(String hostName, 
+        boolean createIfNeccessary) {
+      FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+      long now = clock.getTime();
+      if (fi == null && createIfNeccessary) {
+        fi = new FaultInfo(now);
+        potentiallyFaultyTrackers.put(hostName, fi);
+      }
+      return fi;
+    }
+    
     /**
      * Blacklists the tracker across all jobs if
      * <ol>
@@ -566,9 +673,11 @@
      *     MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
      * <li>#faults is 50% (configurable) above the average #faults</li>
      * <li>50% the cluster is not blacklisted yet </li>
+     * </ol>
      */
-    private boolean shouldBlacklist(String hostName, int numFaults) {
-      if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
+    private boolean exceedsFaults(FaultInfo fi) {
+      int faultCount = fi.getFaultCount();
+      if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) {
         // calculate avgBlackLists
         long clusterSize = getClusterStatus().getTaskTrackers();
         long sum = 0;
@@ -578,7 +687,7 @@
         double avg = (double) sum / clusterSize;
             
         long totalCluster = clusterSize + numBlacklistedTrackers;
-        if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+        if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
             numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
           return true;
         }
@@ -621,16 +730,12 @@
         if (fi != null &&
             (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
           int numFaults = fi.getFaultCount() - 1;
-          if (fi.isBlacklisted()) {
-            LOG.info("Removing " + hostName + " from blacklist");
-            addHostCapacity(hostName);
-            fi.setBlacklist(false);
-          }
-          if (numFaults > 0) {
-            fi.setFaultCount(numFaults);
-            fi.setLastUpdated(now);
-          } else {
-            potentiallyFaultyTrackers.remove(hostName);
+          fi.setFaultCount(numFaults);
+          fi.setLastUpdated(now);
+          if (canUnBlackListTracker(hostName, 
+              ReasonForBlackListing.EXCEEDING_FAILURES)) {
+            unBlackListTracker(hostName,
+                ReasonForBlackListing.EXCEEDING_FAILURES);
           }
         }
         return (fi != null && fi.isBlacklisted());
@@ -699,6 +804,41 @@
       }
       return 0;
     }
+    
+    Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = null;
+        if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+          return fi.getReasonforblacklisting();
+        }
+      }
+      return null;
+    }
+
+
+    void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
+      FaultInfo fi = null;
+      // If tracker is not healthy, create a fault info object
+      // blacklist it.
+      if (!isHealthy) {
+        fi = getFaultInfo(hostName, true);
+        fi.setHealthy(isHealthy);
+        synchronized (potentiallyFaultyTrackers) {
+          blackListTracker(hostName, reason,
+              ReasonForBlackListing.NODE_UNHEALTHY);
+        }
+      } else {
+        fi = getFaultInfo(hostName, false);
+        if (fi == null) {
+          return;
+        } else {
+          if (canUnBlackListTracker(hostName,
+              ReasonForBlackListing.NODE_UNHEALTHY)) {
+            unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY);
+          }
+        }
+      }
+    }
   }
   
   /**
@@ -2684,7 +2824,7 @@
     // Initialize the response to be sent for the heartbeat
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
-      
+    isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
@@ -2886,6 +3026,15 @@
     getInstrumentation().setReduceSlots(totalReduceTaskCapacity);
     return oldStatus != null;
   }
+  
+  
+  private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
+    TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
+    synchronized (faultyTrackers) {
+      faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(), 
+          status.isNodeHealthy(), status.getHealthReport());
+    }
+  }
     
   /**
    * Process incoming heartbeat messages from the task trackers.
@@ -2927,6 +3076,7 @@
     }
 
     updateTaskStatuses(trackerStatus);
+    updateNodeHealthStatus(trackerStatus);
     
     return true;
   }
@@ -4069,4 +4219,25 @@
       throw new IOException(jobStr.toString() + msg);
     }
   }
+  
+  String getReasonsForBlacklisting(String host) {
+    FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+    if (fi == null) {
+      return "";
+    }
+    return fi.getTrackerFaultReport();
+  }
+
+  /** Test Methods */
+  Set<ReasonForBlackListing> getReasonForBlackList(String host) {
+    FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+    if (fi == null) {
+      return new HashSet<ReasonForBlackListing>();
+    }
+    return fi.getReasonforblacklisting();
+  }
+  
+  void incrementFaults(String hostName) {
+    faultyTrackers.incrementFaults(hostName);
+  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java?rev=789733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java Tue Jun 30 13:59:45 2009
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * 
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+class NodeHealthCheckerService {
+
+  private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
+
+  /** Absolute path to the health script. */
+  private String nodeHealthScript;
+  /** Delay after which node health script to be executed */
+  private long intervalTime;
+  /** Time after which the script should be timedout */
+  private long scriptTimeout;
+  /** Timer used to schedule node health monitoring script execution */
+  private Timer nodeHealthScriptScheduler;
+
+  /** ShellCommandExecutor used to execute monitoring script */
+  ShellCommandExecutor shexec = null;
+
+  /** Configuration used by the checker */
+  private Configuration conf;
+
+  /** Pattern used for searching in the output of the node health script */
+  static private final String ERROR_PATTERN = "ERROR";
+
+  /* Configuration keys */
+  static final String HEALTH_CHECK_SCRIPT_PROPERTY = "mapred.healthChecker.script.path";
+
+  static final String HEALTH_CHECK_INTERVAL_PROPERTY = "mapred.healthChecker.interval";
+
+  static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = "mapred.healthChecker.script.timeout";
+
+  static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = "mapred.healthChecker.script.args";
+  /* end of configuration keys */
+
+  /** Default frequency of running node health script */
+  private static final long DEFAULT_HEALTH_CHECK_INTERVAL = 10 * 60 * 1000;
+  /** Default script time out period */
+  private static final long DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL = 2 * DEFAULT_HEALTH_CHECK_INTERVAL;
+
+  private boolean isHealthy;
+
+  private String healthReport;
+
+  private long lastReportedTime;
+
+  private TimerTask timer;
+  
+  private enum HealthCheckerExitStatus {
+    SUCCESS,
+    TIMED_OUT,
+    FAILED_WITH_EXIT_CODE,
+    FAILED_WITH_EXCEPTION,
+    FAILED
+  }
+
+
+  /**
+   * Class which is used by the {@link Timer} class to periodically execute the
+   * node health script.
+   * 
+   */
+  private class NodeHealthMonitorExecutor extends TimerTask {
+
+    String exceptionStackTrace = "";
+
+    public NodeHealthMonitorExecutor(String[] args) {
+      ArrayList<String> execScript = new ArrayList<String>();
+      execScript.add(nodeHealthScript);
+      if (args != null) {
+        execScript.addAll(Arrays.asList(args));
+      }
+      shexec = new ShellCommandExecutor((String[]) execScript
+          .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+    }
+
+    @Override
+    public void run() {
+      HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+      try {
+        shexec.execute();
+      } catch (ExitCodeException e) {
+        // ignore the exit code of the script
+        status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+      } catch (Exception e) {
+        LOG.warn("Caught exception : " + e.getMessage());
+        status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+        exceptionStackTrace = StringUtils.stringifyException(e);
+      } finally {
+        if (status == HealthCheckerExitStatus.SUCCESS) {
+          if (hasErrors(shexec.getOutput())) {
+            status = HealthCheckerExitStatus.FAILED;
+          }
+        }
+        reportHealthStatus(status);
+      }
+    }
+
+    /**
+     * Method which is used to parse output from the node health monitor and
+     * send to the report address.
+     * 
+     * The timed out script or script which causes IOException output is
+     * ignored.
+     * 
+     * The node is marked unhealthy if
+     * <ol>
+     * <li>The node health script times out</li>
+     * <li>The node health scripts output has a line which begins with ERROR</li>
+     * <li>An exception is thrown while executing the script</li>
+     * </ol>
+     * If the script throws {@link IOException} or {@link ExitCodeException} the
+     * output is ignored and node is left remaining healthy, as script might
+     * have syntax error.
+     * 
+     * @param status
+     */
+    void reportHealthStatus(HealthCheckerExitStatus status) {
+      long now = System.currentTimeMillis();
+      switch (status) {
+      case SUCCESS:
+        setHealthStatus(true, "", now);
+        break;
+      case TIMED_OUT:
+        setHealthStatus(false, "Node health script timed out");
+        break;
+      case FAILED_WITH_EXCEPTION:
+        setHealthStatus(false, exceptionStackTrace);
+        break;
+      case FAILED_WITH_EXIT_CODE:
+        setHealthStatus(true, "", now);
+        break;
+      case FAILED:
+        setHealthStatus(false, shexec.getOutput());
+        break;
+      }
+    }
+
+    /**
+     * Method to check if the output string has line which begins with ERROR.
+     * 
+     * @param output
+     *          string
+     * @return true if output string has error pattern in it.
+     */
+    private boolean hasErrors(String output) {
+      String[] splits = output.split("\n");
+      for (String split : splits) {
+        if (split.startsWith(ERROR_PATTERN)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  public NodeHealthCheckerService(Configuration conf) {
+    this.conf = conf;
+    this.lastReportedTime = System.currentTimeMillis();
+    this.isHealthy = true;
+    this.healthReport = "";
+    initialize(conf);
+  }
+
+  /*
+   * Method which initializes the values for the script path and interval time.
+   */
+  private void initialize(Configuration conf) {
+    this.nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_CHECK_INTERVAL);
+    this.scriptTimeout = conf.getLong(HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
+    String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+        new String[] {});
+    timer = new NodeHealthMonitorExecutor(args);
+  }
+
+  /**
+   * Method used to start the Node health monitoring.
+   * 
+   */
+  void start() {
+    // if health script path is not configured don't start the thread.
+    if (!shouldRun(conf)) {
+      LOG.info("Not starting node health monitor");
+      return;
+    }
+    nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+    // Start the timer task immediately and
+    // then periodically at interval time.
+    nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+  }
+
+  /**
+   * Method used to terminate the node health monitoring service.
+   * 
+   */
+  void stop() {
+    if (!shouldRun(conf)) {
+      return;
+    }
+    nodeHealthScriptScheduler.cancel();
+    if (shexec != null) {
+      Process p = shexec.getProcess();
+      if (p != null) {
+        p.destroy();
+      }
+    }
+  }
+
+  /**
+   * Gets the if the node is healthy or not
+   * 
+   * @return true if node is healthy
+   */
+  private boolean isHealthy() {
+    return isHealthy;
+  }
+
+  /**
+   * Sets if the node is healhty or not.
+   * 
+   * @param isHealthy
+   *          if or not node is healthy
+   */
+  private synchronized void setHealthy(boolean isHealthy) {
+    this.isHealthy = isHealthy;
+  }
+
+  /**
+   * Returns output from health script. if node is healthy then an empty string
+   * is returned.
+   * 
+   * @return output from health script
+   */
+  private String getHealthReport() {
+    return healthReport;
+  }
+
+  /**
+   * Sets the health report from the node health script.
+   * 
+   * @param healthReport
+   */
+  private synchronized void setHealthReport(String healthReport) {
+    this.healthReport = healthReport;
+  }
+  
+  /**
+   * Returns time stamp when node health script was last run.
+   * 
+   * @return timestamp when node health script was last run
+   */
+  private long getLastReportedTime() {
+    return lastReportedTime;
+  }
+
+  /**
+   * Sets the last run time of the node health script.
+   * 
+   * @param lastReportedTime
+   */
+  private synchronized void setLastReportedTime(long lastReportedTime) {
+    this.lastReportedTime = lastReportedTime;
+  }
+
+  /**
+   * Method used to determine if or not node health monitoring service should be
+   * started or not. Returns true if following conditions are met:
+   * 
+   * <ol>
+   * <li>Path to Node health check script is not empty</li>
+   * <li>Node health check script file exists</li>
+   * </ol>
+   * 
+   * @param conf
+   * @return true if node health monitoring service can be started.
+   */
+  static boolean shouldRun(Configuration conf) {
+    String nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+      return false;
+    }
+    File f = new File(nodeHealthScript);
+    return f.exists() && f.canExecute();
+  }
+
+  private synchronized void setHealthStatus(boolean isHealthy, String output) {
+    this.setHealthy(isHealthy);
+    this.setHealthReport(output);
+  }
+  
+  private synchronized void setHealthStatus(boolean isHealthy, String output,
+      long time) {
+    this.setHealthStatus(isHealthy, output);
+    this.setLastReportedTime(time);
+  }
+  
+  /**
+   * Method to populate the fields for the {@link TaskTrackerHealthStatus}
+   * 
+   * @param healthStatus
+   */
+  synchronized void setHealthStatus(TaskTrackerHealthStatus healthStatus) {
+    healthStatus.setNodeHealthy(this.isHealthy());
+    healthStatus.setHealthReport(this.getHealthReport());
+    healthStatus.setLastReported(this.getLastReportedTime());
+  }
+  
+  /**
+   * Test method to directly access the timer which node 
+   * health checker would use.
+   * 
+   *
+   * @return Timer task
+   */
+  //XXX:Not to be used directly.
+  TimerTask getTimer() {
+    return timer;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun 30 13:59:45 2009
@@ -66,6 +66,7 @@
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -225,6 +226,11 @@
   */
   private TaskController taskController;
   
+  /**
+   * Handle to the specific instance of the {@link NodeHealthCheckerService}
+   */
+  private NodeHealthCheckerService healthChecker;
+  
   /*
    * A list of commitTaskActions for whom commit response has been received 
    */
@@ -539,6 +545,11 @@
     
     //setup and create jobcache directory with appropriate permissions
     taskController.setup();
+    
+    //Start up node health checker service.
+    if (shouldStartHealthMonitor(this.fConf)) {
+      startHealthMonitor(this.fConf);
+    }
   }
   
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
@@ -896,6 +907,11 @@
       taskReportServer.stop();
       taskReportServer = null;
     }
+    if (healthChecker != null) {
+      //stop node health checker service
+      healthChecker.stop();
+      healthChecker = null;
+    }
   }
 
   /**
@@ -1208,7 +1224,18 @@
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
     }
-      
+    //add node health information
+    
+    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+    synchronized (this) {
+      if (healthChecker != null) {
+        healthChecker.setHealthStatus(healthStatus);
+      } else {
+        healthStatus.setNodeHealthy(true);
+        healthStatus.setLastReported(0L);
+        healthStatus.setHealthReport("");
+      }
+    }
     //
     // Xmit the heartbeat
     //
@@ -3114,4 +3141,24 @@
       }
     }
   }
+  
+  /**
+   * Wrapper method used by TaskTracker to check if {@link  NodeHealthCheckerService}
+   * can be started
+   * @param conf configuration used to check if service can be started
+   * @return true if service can be started
+   */
+  private boolean shouldStartHealthMonitor(Configuration conf) {
+    return NodeHealthCheckerService.shouldRun(conf);
+  }
+  
+  /**
+   * Wrapper method used to start {@link NodeHealthCheckerService} for 
+   * Task Tracker
+   * @param conf Configuration used by the service.
+   */
+  private void startHealthMonitor(Configuration conf) {
+    healthChecker = new NodeHealthCheckerService(conf);
+    healthChecker.start();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Jun 30 13:59:45 2009
@@ -53,6 +53,7 @@
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
+  private TaskTrackerHealthStatus healthStatus;
    
   /**
    * Class representing a collection of resources on this tasktracker.
@@ -196,6 +197,7 @@
   public TaskTrackerStatus() {
     taskReports = new ArrayList<TaskStatus>();
     resStatus = new ResourceStatus();
+    this.healthStatus = new TaskTrackerHealthStatus();
   }
   
   public TaskTrackerStatus(String trackerName, String host) {
@@ -219,6 +221,7 @@
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
     this.resStatus = new ResourceStatus();
+    this.healthStatus = new TaskTrackerHealthStatus();
   }
 
   /**
@@ -383,13 +386,128 @@
   ResourceStatus getResourceStatus() {
     return resStatus;
   }
+
+  /**
+   * Returns health status of the task tracker.
+   * @return health status of Task Tracker
+   */
+  public TaskTrackerHealthStatus getHealthStatus() {
+    return healthStatus;
+  }
+
+  /**
+   * Static class which encapsulates the Node health
+   * related fields.
+   * 
+   */
+  /**
+   * Static class which encapsulates the Node health
+   * related fields.
+   * 
+   */
+  static class TaskTrackerHealthStatus implements Writable {
+    
+    private boolean isNodeHealthy;
+    
+    private String healthReport;
+    
+    private long lastReported;
+    
+    public TaskTrackerHealthStatus(boolean isNodeHealthy, String healthReport,
+        long lastReported) {
+      this.isNodeHealthy = isNodeHealthy;
+      this.healthReport = healthReport;
+      this.lastReported = lastReported;
+    }
+    
+    public TaskTrackerHealthStatus() {
+      this.isNodeHealthy = true;
+      this.healthReport = "";
+      this.lastReported = System.currentTimeMillis();
+    }
+
+    /**
+     * Sets whether or not a task tracker is healthy or not, based on the
+     * output from the node health script.
+     * 
+     * @param isNodeHealthy
+     */
+    void setNodeHealthy(boolean isNodeHealthy) {
+      this.isNodeHealthy = isNodeHealthy;
+    }
+
+    /**
+     * Returns if node is healthy or not based on result from node health
+     * script.
+     * 
+     * @return true if the node is healthy.
+     */
+    boolean isNodeHealthy() {
+      return isNodeHealthy;
+    }
+
+    /**
+     * Sets the health report based on the output from the health script.
+     * 
+     * @param healthReport
+     *          String listing cause of failure.
+     */
+    void setHealthReport(String healthReport) {
+      this.healthReport = healthReport;
+    }
+
+    /**
+     * Returns the health report of the node if any, The health report is
+     * only populated when the node is not healthy.
+     * 
+     * @return health report of the node if any
+     */
+    String getHealthReport() {
+      return healthReport;
+    }
+
+    /**
+     * Sets when the TT got its health information last 
+     * from node health monitoring service.
+     * 
+     * @param lastReported last reported time by node 
+     * health script
+     */
+    public void setLastReported(long lastReported) {
+      this.lastReported = lastReported;
+    }
+
+    /**
+     * Gets time of most recent node health update.
+     * 
+     * @return time stamp of most recent health update.
+     */
+    public long getLastReported() {
+      return lastReported;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      isNodeHealthy = in.readBoolean();
+      healthReport = Text.readString(in);
+      lastReported = in.readLong();
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeBoolean(isNodeHealthy);
+      Text.writeString(out, healthReport);
+      out.writeLong(lastReported);
+    }
+    
+  }
   
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, trackerName);
-    UTF8.writeString(out, host);
+    Text.writeString(out, trackerName);
+    Text.writeString(out, host);
     out.writeInt(httpPort);
     out.writeInt(failures);
     out.writeInt(maxMapTasks);
@@ -400,11 +518,12 @@
     for (TaskStatus taskStatus : taskReports) {
       TaskStatus.writeTaskStatus(out, taskStatus);
     }
+    getHealthStatus().write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.trackerName = UTF8.readString(in);
-    this.host = UTF8.readString(in);
+    this.trackerName = Text.readString(in);
+    this.host = Text.readString(in);
     this.httpPort = in.readInt();
     this.failures = in.readInt();
     this.maxMapTasks = in.readInt();
@@ -416,5 +535,6 @@
     for (int i = 0; i < numTasks; i++) {
       taskReports.add(TaskStatus.readTaskStatus(in));
     }
+    getHealthStatus().readFields(in);
   }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java?rev=789733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java Tue Jun 30 13:59:45 2009
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+
+import junit.framework.TestCase;
+
+public class TestNodeHealthService extends TestCase {
+
+  private static volatile Log LOG = LogFactory
+      .getLog(TestNodeHealthService.class);
+
+  private static final String nodeHealthConfigPath = System.getProperty(
+      "test.build.extraconf", "build/test/extraconf");
+
+  final static File nodeHealthConfigFile = new File(nodeHealthConfigPath,
+      "mapred-site.xml");
+
+  private String testRootDir = new File(System.getProperty("test.build.data",
+      "/tmp")).getAbsolutePath();
+
+  private File nodeHealthscriptFile = new File(testRootDir, "failingscript.sh");
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (nodeHealthConfigFile.exists()) {
+      nodeHealthConfigFile.delete();
+    }
+    if (nodeHealthscriptFile.exists()) {
+      nodeHealthscriptFile.delete();
+    }
+    super.tearDown();
+  }
+
+  private Configuration getConfForNodeHealthScript() {
+    Configuration conf = new Configuration();
+    conf.set(NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY,
+        nodeHealthscriptFile.getAbsolutePath());
+    conf.setLong(NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY, 500);
+    conf.setLong(
+        NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY, 1000);
+    return conf;
+  }
+
+  private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
+      throws IOException {
+    PrintWriter pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
+    pw.println(scriptStr);
+    pw.flush();
+    pw.close();
+    nodeHealthscriptFile.setExecutable(setExecutable);
+  }
+
+  public void testNodeHealthScriptShouldRun() throws IOException {
+    // Node health script should not start if there is no property called
+    // node health script path.
+    assertFalse("Health checker should not have started",
+        NodeHealthCheckerService.shouldRun(new Configuration()));
+    Configuration conf = getConfForNodeHealthScript();
+    // Node health script should not start if the node health script does not
+    // exists
+    assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    // Create script path.
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+    writeNodeHealthScriptFile("", false);
+    // Node health script should not start if the node health script is not
+    // executable.
+    assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    writeNodeHealthScriptFile("", true);
+    assertTrue("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+  }
+
+  public void testNodeHealthScript() throws Exception {
+    TaskTrackerHealthStatus healthStatus = new TaskTrackerHealthStatus();
+    String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
+    String normalScript = "echo \"I am all fine\"";
+    String timeOutScript = "sleep 4\n echo\"I am fine\"";
+    Configuration conf = getConfForNodeHealthScript();
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+
+    NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
+        conf);
+    TimerTask timer = nodeHealthChecker.getTimer();
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking initial healthy condition");
+    // Check proper report conditions.
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // write out error file.
+    // Healthy to unhealthy transition
+    writeNodeHealthScriptFile(errorScript, true);
+    // Run timer
+    timer.run();
+    // update health status
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->Unhealthy");
+    assertFalse("Node health status reported healthy", healthStatus
+        .isNodeHealthy());
+    assertFalse("Node health status reported healthy", healthStatus
+        .getHealthReport().isEmpty());
+    
+    // Check unhealthy to healthy transitions.
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking UnHealthy--->healthy");
+    // Check proper report conditions.
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // Healthy to timeout transition.
+    writeNodeHealthScriptFile(timeOutScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->timeout");
+    assertFalse("Node health status reported healthy even after timeout",
+        healthStatus.isNodeHealthy());
+    assertFalse("Node health status reported healthy even after timeout",
+        healthStatus.getHealthReport().isEmpty());
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=789733&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Tue Jun 30 13:59:45 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerBlacklisting extends TestCase {
+
+  private static volatile Log LOG = LogFactory
+      .getLog(TestTaskTrackerBlacklisting.class);
+
+  static String trackers[] = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+
+  static String hosts[] = new String[] { "tracker1", "tracker2", "tracker3" };
+
+  private FakeJobTracker jobTracker;
+
+  private FakeJobTrackerClock clock;
+
+  private short responseId;
+
+  private static HashSet<ReasonForBlackListing> nodeUnHealthyReasonSet = new HashSet<ReasonForBlackListing>();
+
+  private static HashSet<ReasonForBlackListing> exceedsFailuresReasonSet = new HashSet<ReasonForBlackListing>();
+
+  private static HashSet<ReasonForBlackListing> unhealthyAndExceedsFailure = new HashSet<ReasonForBlackListing>();
+
+  static {
+    nodeUnHealthyReasonSet.add(ReasonForBlackListing.NODE_UNHEALTHY);
+    exceedsFailuresReasonSet.add(ReasonForBlackListing.EXCEEDING_FAILURES);
+    unhealthyAndExceedsFailure.add(ReasonForBlackListing.NODE_UNHEALTHY);
+    unhealthyAndExceedsFailure.add(ReasonForBlackListing.EXCEEDING_FAILURES);
+  }
+  private static final long aDay = 24 * 60 * 60 * 1000;
+
+  private class FakeJobTrackerClock extends Clock {
+    boolean jumpADay = false;
+
+    @Override
+    long getTime() {
+      if (!jumpADay) {
+        return super.getTime();
+      } else {
+        long now = super.getTime();
+        return now + aDay;
+      }
+    }
+  }
+
+  static class FakeJobTracker extends
+      org.apache.hadoop.mapred.TestSpeculativeExecution.FakeJobTracker {
+    // initialize max{Map/Reduce} task capacities to twice the clustersize
+    int totalSlots = trackers.length * 4;
+
+    FakeJobTracker(JobConf conf, Clock clock) throws IOException,
+        InterruptedException {
+      super(conf, clock);
+    }
+
+    @Override
+    public ClusterStatus getClusterStatus(boolean detailed) {
+      return new ClusterStatus(trackers.length, 0, 0, 0, 0, totalSlots / 2,
+          totalSlots / 2, JobTracker.State.RUNNING, 0);
+    }
+
+    public void setNumSlots(int totalSlots) {
+      this.totalSlots = totalSlots;
+    }
+
+    @Override
+    synchronized void finalizeJob(JobInProgress job) {
+      List<String> blackListedTrackers = job.getBlackListedTrackers();
+      for (String tracker : blackListedTrackers) {
+        incrementFaults(tracker);
+      }
+    }
+  }
+
+  static class FakeJobInProgress extends
+      org.apache.hadoop.mapred.TestSpeculativeExecution.FakeJobInProgress {
+    HashMap<String, Integer> trackerToFailureMap;
+
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(jobConf, tracker);
+      // initObjects(tracker, numMaps, numReduces);
+      trackerToFailureMap = new HashMap<String, Integer>();
+    }
+
+    public void failTask(TaskAttemptID taskId) throws Exception {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+          1.0f, 1, TaskStatus.State.FAILED, "", "", tip
+              .machineWhereTaskRan(taskId), tip.isMapTask() ? Phase.MAP
+              : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+      addFailuresToTrackers(tip.machineWhereTaskRan(taskId));
+    }
+
+    public void addFailuresToTrackers(String trackerName) {
+      Integer numOfFailures = trackerToFailureMap.get(trackerName);
+      if (numOfFailures == null) {
+        numOfFailures = 0;
+      }
+      trackerToFailureMap.put(trackerName, numOfFailures + 1);
+    }
+
+    public List<String> getBlackListedTrackers() {
+      ArrayList<String> blackListedTrackers = new ArrayList<String>();
+      for (Entry<String, Integer> entry : trackerToFailureMap.entrySet()) {
+        Integer failures = entry.getValue();
+        String tracker = entry.getKey();
+        if (failures.intValue() >= this.getJobConf()
+            .getMaxTaskFailuresPerTracker()) {
+          blackListedTrackers.add(JobInProgress
+              .convertTrackerNameToHostName(tracker));
+        }
+      }
+      return blackListedTrackers;
+    }
+  }
+
+  protected void setUp() throws Exception {
+    responseId = 0;
+    JobConf conf = new JobConf();
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    conf.setInt("mapred.max.tracker.blacklists", 1);
+    jobTracker = new FakeJobTracker(conf, (clock = new FakeJobTrackerClock()));
+    sendHeartBeat(null, true);
+  }
+
+  private void sendHeartBeat(TaskTrackerHealthStatus status, boolean initialContact) throws IOException {
+    for (String tracker : trackers) {
+      TaskTrackerStatus tts = new TaskTrackerStatus(tracker, JobInProgress
+          .convertTrackerNameToHostName(tracker));
+      if (status != null) {
+        TaskTrackerHealthStatus healthStatus = tts.getHealthStatus();
+        healthStatus.setNodeHealthy(status.isNodeHealthy());
+        healthStatus.setHealthReport(status.getHealthReport());
+        healthStatus.setLastReported(status.getLastReported());
+      }
+      jobTracker.heartbeat(tts, false, initialContact, false, (short) responseId);
+    }
+    responseId++;
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testTrackerBlacklistingForJobFailures() throws Exception {
+    runBlackListingJob();
+    assertEquals("Tracker 1 not blacklisted", jobTracker
+        .getBlacklistedTrackerCount(), 1);
+    checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
+    clock.jumpADay = true;
+    sendHeartBeat(null, false);
+    assertEquals("Tracker 1 still blacklisted after a day", jobTracker
+        .getBlacklistedTrackerCount(), 0);
+    clock.jumpADay = false;
+  }
+
+  public void testNodeHealthBlackListing() throws Exception {
+    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+    status.setNodeHealthy(false);
+    status.setLastReported(System.currentTimeMillis());
+    status.setHealthReport("ERROR");
+    sendHeartBeat(status, false);
+    for (String host : hosts) {
+      checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
+    }
+    status.setNodeHealthy(true);
+    status.setLastReported(System.currentTimeMillis());
+    status.setHealthReport("");
+    sendHeartBeat(status, false);
+    assertEquals("Trackers still blacklisted after healthy report", jobTracker
+        .getBlacklistedTrackerCount(), 0);
+  }
+
+  public void testBlackListingWithFailuresAndHealthStatus() throws Exception {
+    runBlackListingJob();
+    assertEquals("Tracker 1 not blacklisted", jobTracker
+        .getBlacklistedTrackerCount(), 1);
+    checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
+    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+    status.setNodeHealthy(false);
+    status.setLastReported(System.currentTimeMillis());
+    status.setHealthReport("ERROR");
+    sendHeartBeat(status, false);
+
+    assertEquals("All trackers not blacklisted", 
+        jobTracker.getBlacklistedTrackerCount(), 3);
+    checkReasonForBlackListing(hosts[0], unhealthyAndExceedsFailure);
+    checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
+    checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
+    
+    clock.jumpADay = true;
+    sendHeartBeat(status, false);
+    
+    assertEquals("All trackers not blacklisted", 
+        jobTracker.getBlacklistedTrackerCount(), 3);
+    
+    for (String host : hosts) {
+      checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
+    }
+    sendHeartBeat(null, false);
+    
+    assertEquals("All trackers not white listed", 
+        jobTracker.getBlacklistedTrackerCount(), 0);
+    
+    clock.jumpADay = false;
+  }
+
+  private void runBlackListingJob() throws IOException, Exception {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[3];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(false);
+    conf.setNumMapTasks(0);
+    conf.setNumReduceTasks(5);
+    conf.set("mapred.max.reduce.failures.percent", ".70");
+    conf.setMaxTaskFailuresPerTracker(1);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+
+    taskAttemptID[0] = job.findReduceTask(trackers[0]);
+    taskAttemptID[1] = job.findReduceTask(trackers[1]);
+    taskAttemptID[2] = job.findReduceTask(trackers[2]);
+    job.finishTask(taskAttemptID[1]);
+    job.failTask(taskAttemptID[0]);
+    taskAttemptID[0] = job.findReduceTask(trackers[0]);
+    job.failTask(taskAttemptID[0]);
+    taskAttemptID[0] = job.findReduceTask(trackers[1]);
+    job.finishTask(taskAttemptID[2]);
+    job.finishTask(taskAttemptID[0]);
+    jobTracker.finalizeJob(job);
+  }
+
+  private void checkReasonForBlackListing(String host,
+      Set<ReasonForBlackListing> reasonsForBlackListing) {
+    Set<ReasonForBlackListing> rfbs = jobTracker.getReasonForBlackList(host);
+    assertEquals("Reasons for blacklisting of " + host + " does not match",
+        reasonsForBlackListing, rfbs);
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/machines.jsp?rev=789733&r1=789732&r2=789733&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/machines.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/machines.jsp Tue Jun 30 13:59:45 2009
@@ -48,23 +48,43 @@
       out.println("<h2>Task Trackers</h2>");
       c = tracker.taskTrackers();
     }
+    int noCols = 9;
+    if(type.equals("blacklisted")) {
+      noCols = 10;
+    }
     if (c.size() == 0) {
       out.print("There are currently no known " + type + " Task Trackers.");
     } else {
       out.print("<center>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-      out.print("<tr><td align=\"center\" colspan=\"6\"><b>Task Trackers</b></td></tr>\n");
+      out.print("<tr><td align=\"center\" colspan=\""+ noCols +"\"><b>Task Trackers</b></td></tr>\n");
       out.print("<tr><td><b>Name</b></td><td><b>Host</b></td>" +
                 "<td><b># running tasks</b></td>" +
                 "<td><b>Max Map Tasks</b></td>" +
                 "<td><b>Max Reduce Tasks</b></td>" +
                 "<td><b>Failures</b></td>" +
-                "<td><b>Seconds since heartbeat</b></td></tr>\n");
+                "<td><b>Node Health Status</b></td>" +
+                "<td><b>Seconds Since Node Last Healthy</b></td>");
+      if(type.equals("blacklisted")) {
+      	out.print("<td><b>Reason For blacklisting</b></td>");
+      }
+      out.print("<td><b>Seconds since heartbeat</b></td></tr>\n");
+
       int maxFailures = 0;
       String failureKing = null;
       for (Iterator it = c.iterator(); it.hasNext(); ) {
         TaskTrackerStatus tt = (TaskTrackerStatus) it.next();
         long sinceHeartbeat = System.currentTimeMillis() - tt.getLastSeen();
+        boolean isHealthy = tt.getHealthStatus().isNodeHealthy();
+        long sinceHealthCheck = tt.getHealthStatus().getLastReported();
+        String healthString = "";
+        if(sinceHealthCheck == 0) {
+          healthString = "N/A";
+        } else {
+          healthString = (isHealthy?"Healthy":"Unhealthy");
+          sinceHealthCheck = System.currentTimeMillis() - sinceHealthCheck;
+          sinceHealthCheck =  sinceHealthCheck/1000;
+        }
         if (sinceHeartbeat > 0) {
           sinceHeartbeat = sinceHeartbeat / 1000;
         }
@@ -84,8 +104,13 @@
         out.print(tt.getHost() + "</td><td>" + numCurTasks +
                   "</td><td>" + tt.getMaxMapSlots() +
                   "</td><td>" + tt.getMaxReduceSlots() + 
-                  "</td><td>" + numFailures + 
-                  "</td><td>" + sinceHeartbeat + "</td></tr>\n");
+                  "</td><td>" + numFailures +
+                  "</td><td>" + healthString +
+                  "</td><td>" + sinceHealthCheck); 
+        if(type.equals("blacklisted")) {
+          out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
+        }
+        out.print("</td><td>" + sinceHeartbeat + "</td></tr>\n");
       }
       out.print("</table>\n");
       out.print("</center>\n");



Mime
View raw message