hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1378674 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/m...
Date Wed, 29 Aug 2012 18:32:08 GMT
Author: sseth
Date: Wed Aug 29 18:32:08 2012
New Revision: 1378674

URL: http://svn.apache.org/viewvc?rev=1378674&view=rev
Log:
MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Contributed by Tsuyoshi
OZAWA)

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1378674&r1=1378673&r2=1378674&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Wed Aug 29
18:32:08 2012
@@ -2,3 +2,5 @@ Branch MR-3902
   MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA
via sseth)
 
   MAPREDUCE-4602. Re-create ask list correctly in case of a temporary error in the AM-RM
allocate call (sseth)
+
+  MAPREDUCE-4599. Prevent contianer launches on blacklisted hosts. (Tsuyoshi OZAWA via sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1378674&r1=1378673&r2=1378674&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Wed Aug 29 18:32:08 2012
@@ -866,10 +866,10 @@ protected synchronized void handleEvent(
           // Blakclisted nodes should likely be removed immediately.
           
           // TODO Differentiation between blacklisted versus unusable nodes ?
-          //blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost);
+          boolean blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost);
           nodeUsable = appContext.getNode(allocated.getNodeId()).isUsable();
           
-          if (!nodeUsable) {
+          if (!nodeUsable || blackListed) {
             // we need to request for a new container 
             // and release the current one
             LOG.info("Got allocated container on an unusable "

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1378674&r1=1378673&r2=1378674&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
Wed Aug 29 18:32:08 2012
@@ -237,6 +237,8 @@ public class AMNodeImpl implements AMNod
         node.numFailedTAs++;
         boolean shouldBlacklist = node.shouldBlacklistNode();
         if (shouldBlacklist) {
+          node.sendEvent(new AMNodeEvent(node.getNodeId(),
+              AMNodeEventType.N_NODE_WAS_BLACKLISTED));
           return AMNodeState.BLACKLISTED;
           // TODO XXX: An event likely needs to go out to the scheduler.
         }
@@ -291,6 +293,8 @@ public class AMNodeImpl implements AMNod
     public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
       boolean shouldBlacklist = node.shouldBlacklistNode();
       if (shouldBlacklist) {
+        node.sendEvent(new AMNodeEvent(node.getNodeId(),
+            AMNodeEventType.N_NODE_WAS_BLACKLISTED));
         return AMNodeState.BLACKLISTED;
         // TODO XXX: An event likely needs to go out to the scheduler.
       }
@@ -375,7 +379,6 @@ public class AMNodeImpl implements AMNod
 
   @Override
   public boolean isUsable() {
-    // TODO Auto-generated method stub
     this.readLock.lock();
     try {
       return (EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1378674&r1=1378673&r2=1378674&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
Wed Aug 29 18:32:08 2012
@@ -1,68 +1,121 @@
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
 
-// TODO Seems a little strange, extending ConcurrentHashMap like this.
-// TODO This needs to extend AbstractService to get a handle on the conf.
-@SuppressWarnings("serial")
-public class AMNodeMap extends ConcurrentHashMap<NodeId, AMNode> implements
+public class AMNodeMap extends AbstractService implements
     EventHandler<AMNodeEvent> {
 
+  static final Log LOG = LogFactory.getLog(AMNodeMap.class);
   
-  
-  private final EventHandler eventHandler;
+  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+  // TODO XXX -> blacklistMap is also used for computing forcedUnblacklisting.
+  private final ConcurrentHashMap<String, ArrayList<NodeId>> blacklistMap;
+  private final EventHandler<?> eventHandler;
   private final AppContext appContext;
-  
-  public AMNodeMap(EventHandler eventHandler, AppContext appContext) {
+  private int maxTaskFailuresPerNode;
+  private boolean nodeBlacklistingEnabled;
+  private int blacklistDisablePercent;
+  
+  public AMNodeMap(EventHandler<?> eventHandler, AppContext appContext) {
+    super("AMNodeMap");
+    this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
+    this.blacklistMap = new ConcurrentHashMap<String, ArrayList<NodeId>>();
     this.eventHandler = eventHandler;
     this.appContext = appContext;
-    
+  
     // TODO XXX: Get a handle of allowed failures.
   }
   
+  @Override
+  public synchronized void init(Configuration config) {
+    Configuration conf = new Configuration(config);
+    this.maxTaskFailuresPerNode = conf.getInt(
+        MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+    this.nodeBlacklistingEnabled = config.getBoolean(
+        MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    this.blacklistDisablePercent = config.getInt(
+          MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+          MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
+    
+    LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+    if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+      throw new YarnException("Invalid blacklistDisablePercent: "
+          + blacklistDisablePercent
+          + ". Should be an integer between 0 and 100 or -1 to disabled");
+    }
+    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
+    
+    super.init(conf);
+  }
+  
+  
   public void nodeSeen(NodeId nodeId) {
-    // TODO Replace 3 with correct value.
-    putIfAbsent(nodeId, new AMNodeImpl(nodeId, 3, eventHandler, appContext));
+    nodeMap.putIfAbsent(nodeId, new AMNodeImpl(
+        nodeId, maxTaskFailuresPerNode, eventHandler, appContext));
   }
   
   public boolean isHostBlackListed(String hostname) {
-    return false;
-    // Node versus host blacklisting.
- // TODO XXX -> Maintain a map of host to NodeList (case of multiple NMs)
-    // Provide functionality to say isHostBlacklisted(hostname) -> all hosts.
-    // ... blacklisted means don't ask for containers on this host.
-    // Same list to be used for computing forcedUnblacklisting.
+    if (!nodeBlacklistingEnabled) {
+      return false;
+    }
+    
+    return blacklistMap.containsKey(hostname);
   }
   
+  private void addToBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+    ArrayList<NodeId> nodes;
+    
+    if (!blacklistMap.containsKey(host)) {
+      nodes = new ArrayList<NodeId>();
+      blacklistMap.put(host, nodes);
+    } else {
+      nodes = blacklistMap.get(host);
+    }
+    
+    if (!nodes.contains(nodeId)) {
+      nodes.add(nodeId);
+    }
+  }
+  
+  // TODO: Currently, un-blacklisting feature is not supported.
+  /*
+  private void removeFromBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+    if (blacklistMap.containsKey(host)) {
+      ArrayList<NodeId> nodes = blacklistMap.get(host);
+      nodes.remove(nodeId);
+    }
+  }
+  */
+  
   public void handle(AMNodeEvent event) {
     if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) {
-      // TODO Handle blacklisting.
+      NodeId nodeId = event.getNodeId();
+      addToBlackList(nodeId);
     } else {
       NodeId nodeId = event.getNodeId();
-      get(nodeId).handle(event);
+      nodeMap.get(nodeId).handle(event);
     }
   }
   
+  public AMNode get(NodeId nodeId) {
+    return nodeMap.get(nodeId);
+  }
   
+  public int size() {
+    return nodeMap.size();
+  }
   
-//nodeBlacklistingEnabled = 
-//conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
-//LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
-//maxTaskFailuresPerNode = 
-//conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
-//blacklistDisablePercent =
-//  conf.getInt(
-//      MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
-//      MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
-//LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
-//if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
-//throw new YarnException("Invalid blacklistDisablePercent: "
-//    + blacklistDisablePercent
-//    + ". Should be an integer between 0 and 100 or -1 to disabled");
-//}
-//LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
 }



Mime
View raw message