tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1567. Avoid blacklisting nodes when the disable blacklisting threshold is about to be hit. (sseth)
Date Wed, 22 Oct 2014 22:56:31 GMT
Repository: tez
Updated Branches:
  refs/heads/master 6bc134837 -> e530a46c3


TEZ-1567. Avoid blacklisting nodes when the disable blacklisting
threshold is about to be hit. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e530a46c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e530a46c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e530a46c

Branch: refs/heads/master
Commit: e530a46c352234459d2aced93e43fcb83c98d450
Parents: 6bc1348
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Oct 22 15:56:12 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Oct 22 15:56:12 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tez-dag/findbugs-exclude.xml                    |   2 +-
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   8 +-
 .../dag/app/launcher/ContainerLauncherImpl.java |   2 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   2 +-
 .../tez/dag/app/rm/node/AMNodeEventType.java    |  11 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |  38 ++-
 .../apache/tez/dag/app/rm/node/AMNodeMap.java   | 225 -------------
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 211 ++++++++++++
 .../tez/dag/app/rm/TestContainerReuse.java      |  30 +-
 .../tez/dag/app/rm/node/TestAMNodeMap.java      | 268 ---------------
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  | 330 +++++++++++++++++++
 13 files changed, 596 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e6eaf33..97e6cda 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -57,6 +57,7 @@ ALL CHANGES:
   abort when AM shutdown.
   TEZ-1643. DAGAppMaster kills DAG & shuts down, when RM is restarted.
   TEZ-1684. upgrade mockito to latest release.
+  TEZ-1567. Avoid blacklisting nodes when the disable blacklisting threshold is about to be hit.
 
 
 Release 0.5.1: 2014-10-02

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 0634edb..d3d365d 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -14,7 +14,7 @@
 <FindBugsFilter>
 
   <Match>
-    <Class name="org.apache.tez.dag.app.rm.node.AMNodeMap" />
+    <Class name="org.apache.tez.dag.app.rm.node.AMNodeTracker" />
     <Or>
       <Field name="blacklistDisablePercent" />
       <Field name="maxTaskFailuresPerNode" />

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index af244c8..5cedc56 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
@@ -77,7 +77,7 @@ public interface AppContext {
 
   AMContainerMap getAllContainers();
 
-  AMNodeMap getAllNodes();
+  AMNodeTracker getNodeTracker();
 
   TaskSchedulerEventHandler getTaskScheduler();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 0284281..321fb01 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -138,7 +138,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
-import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
@@ -200,7 +200,7 @@ public class DAGAppMaster extends AbstractService {
   private final String workingDirectory;
   private ContainerSignatureMatcher containerSignatureMatcher;
   private AMContainerMap containers;
-  private AMNodeMap nodes;
+  private AMNodeTracker nodes;
   private AppContext context;
   private Configuration amConf;
   private Dispatcher dispatcher;
@@ -351,7 +351,7 @@ public class DAGAppMaster extends AbstractService {
     addIfService(containers, true);
     dispatcher.register(AMContainerEventType.class, containers);
 
-    nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
+    nodes = new AMNodeTracker(dispatcher.getEventHandler(), context);
     addIfService(nodes, true);
     dispatcher.register(AMNodeEventType.class, nodes);
 
@@ -1215,7 +1215,7 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public AMNodeMap getAllNodes() {
+    public AMNodeTracker getNodeTracker() {
       return nodes;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 6ee04d7..67832fa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -291,7 +291,7 @@ public class ContainerLauncherImpl extends AbstractService implements
 
             // nodes where containers will run at *this* point of time. This is
             // *not* the cluster size and doesn't need to be.
-            int numNodes = context.getAllNodes().size();
+            int numNodes = context.getNodeTracker().getNumNodes();
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
             if (poolSize < idealPoolSize) {

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 4e4b805..df66fa0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -367,7 +367,7 @@ public class TaskSchedulerEventHandler extends AbstractService
                                            Container container) {
     ContainerId containerId = container.getId();
     if (appContext.getAllContainers().addContainerIfNew(container)) {
-      appContext.getAllNodes().nodeSeen(container.getNodeId());
+      appContext.getNodeTracker().nodeSeen(container.getNodeId());
       sendEvent(new AMNodeEventContainerAllocated(container
           .getNodeId(), container.getId()));
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
index 421e482..86087d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
@@ -22,11 +22,13 @@ public enum AMNodeEventType {
   //Producer: Scheduler
   N_CONTAINER_ALLOCATED,
   
-  //Producer: TaskAttempt
+  //Producer: TaskSchedulerEventHandler
   N_TA_SUCCEEDED,
+
+  // Producer: TaskSchedulerEventHnadler, Task(retroactive failure)
   N_TA_ENDED,
   
-  //Producer: RMCommunicator
+  //Producer: TaskScheduler via TaskSchedulerEventHandler
   N_TURNED_UNHEALTHY,
   N_TURNED_HEALTHY,
   N_NODE_COUNT_UPDATED, // for blacklisting.
@@ -34,8 +36,5 @@ public enum AMNodeEventType {
   //Producer: AMNodeManager
   N_IGNORE_BLACKLISTING_ENABLED,
   N_IGNORE_BLACKLISTING_DISABLED,
-  
-  // Producer: AMNode - Will not reach AMNodeImpl. Used to compute whether
-  // blacklisting should be ignored.
-  N_NODE_WAS_BLACKLISTED
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 9a23646..6358c04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -240,10 +240,16 @@ public class AMNodeImpl implements AMNode {
     }
   }
 
-  protected boolean shouldBlacklistNode() {
+  /* Check whether this node needs to be blacklisted based on node specific information */
+  protected boolean qualifiesForBlacklisting() {
     return blacklistingEnabled && (numFailedTAs >= maxTaskFailuresPerNode);
   }
 
+  /* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */
+  protected boolean registerBadNodeAndShouldBlacklist() {
+    return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this);
+  }
+
   protected void blacklistSelf() {
     for (ContainerId c : containers) {
       sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted"));
@@ -252,8 +258,6 @@ public class AMNodeImpl implements AMNode {
     pastContainers.addAll(containers);
     containers.clear();
     sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
-    sendEvent(new AMNodeEvent(getNodeId(),
-        AMNodeEventType.N_NODE_WAS_BLACKLISTED));
   }
 
   @SuppressWarnings("unchecked")
@@ -296,12 +300,15 @@ public class AMNodeImpl implements AMNode {
         if (node.failedAttemptIds.add(event.getTaskAttemptId())) {
           // new failed container on node
           node.numFailedTAs++;
-          boolean shouldBlacklist = node.shouldBlacklistNode();
-          if (shouldBlacklist) {
-            LOG.info("Too many task attempt failures. " + 
-                     "Blacklisting node: " + node.getNodeId());
-            node.blacklistSelf();
-            return AMNodeState.BLACKLISTED;
+          if (node.qualifiesForBlacklisting()) {
+            if (node.registerBadNodeAndShouldBlacklist()) {
+              LOG.info("Too many task attempt failures. " +
+                  "Blacklisting node: " + node.getNodeId());
+              node.blacklistSelf();
+              return AMNodeState.BLACKLISTED;
+            } else {
+              // Stay in ACTIVE state. Move to FORCED_ACTIVE only when an explicit message is received.
+            }
           }
         }
       }
@@ -329,10 +336,15 @@ public class AMNodeImpl implements AMNode {
     @Override
     public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
       node.ignoreBlacklisting = false;
-      boolean shouldBlacklist = node.shouldBlacklistNode();
-      if (shouldBlacklist) {
-        node.blacklistSelf();
-        return AMNodeState.BLACKLISTED;
+      if (node.qualifiesForBlacklisting()) {
+        if (node.registerBadNodeAndShouldBlacklist()) {
+          LOG.info("Too many previous task failures after blacklisting re-enabled. " +
+              "Blacklisting node: " + node.getNodeId());
+          node.blacklistSelf();
+          return AMNodeState.BLACKLISTED;
+        } else {
+          // Stay in ACTIVE state. Move to FORCED_ACTIVE only when an explicit message is received.
+        }
       }
       return AMNodeState.ACTIVE;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
deleted file mode 100644
index e25b713..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm.node;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class AMNodeMap extends AbstractService implements
-    EventHandler<AMNodeEvent> {
-  
-  static final Log LOG = LogFactory.getLog(AMNodeMap.class);
-  
-  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
-  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
-  @SuppressWarnings("rawtypes")
-  private final EventHandler eventHandler;
-  private final AppContext appContext;
-  private int numClusterNodes;
-  private boolean ignoreBlacklisting = false;
-  private int maxTaskFailuresPerNode;
-  private boolean nodeBlacklistingEnabled;
-  private int blacklistDisablePercent;
-  
-  
-  @SuppressWarnings("rawtypes")
-  public AMNodeMap(EventHandler eventHandler, AppContext appContext) {
-    super("AMNodeMap");
-    this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
-    this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
-    this.eventHandler = eventHandler;
-    this.appContext = appContext;
-  }
-  
-  @Override
-  public synchronized void serviceInit(Configuration conf) {
-    this.maxTaskFailuresPerNode = conf.getInt(
-        TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 
-        TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT);
-    this.nodeBlacklistingEnabled = conf.getBoolean(
-        TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED,
-        TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT);
-    this.blacklistDisablePercent = conf.getInt(
-          TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
-          TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
-
-    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
-        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
-        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
-
-    if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
-      throw new TezUncheckedException("Invalid blacklistDisablePercent: "
-          + blacklistDisablePercent
-          + ". Should be an integer between 0 and 100 or -1 to disabled");
-    }
-  }
-  
-  public void nodeSeen(NodeId nodeId) {
-    if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
-        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
-      LOG.info("Adding new node: " + nodeId);
-    }
-  }
-
-  // Interface for the scheduler to check about a specific host.
-  public boolean isHostBlackListed(String hostname) {
-    if (!nodeBlacklistingEnabled || ignoreBlacklisting) {
-      return false;
-    }
-    return blacklistMap.containsKey(hostname);
-  }
-
-  private void addToBlackList(NodeId nodeId) {
-    String host = nodeId.getHost();
-    Set<NodeId> nodes;
-    
-    if (!blacklistMap.containsKey(host)) {
-      nodes = new HashSet<NodeId>();
-      blacklistMap.put(host, nodes);
-    } else {
-      nodes = blacklistMap.get(host);
-    }
-    
-    if (!nodes.contains(nodeId)) {
-      nodes.add(nodeId);
-    }
-  }
-  
-  // TODO: Currently, un-blacklisting feature is not supported.
-  /*
-  private void removeFromBlackList(NodeId nodeId) {
-    String host = nodeId.getHost();
-    if (blacklistMap.containsKey(host)) {
-      ArrayList<NodeId> nodes = blacklistMap.get(host);
-      nodes.remove(nodeId);
-    }
-  }
-  */
-
-  public void handle(AMNodeEvent rEvent) {
-    // No synchronization required until there's multiple dispatchers.
-    NodeId nodeId = rEvent.getNodeId();
-    switch (rEvent.getType()) {
-    case N_NODE_WAS_BLACKLISTED:
-      // When moving away from IGNORE_BLACKLISTING state, nodes will send out
-      // blacklisted events. These need to be ignored.
-      addToBlackList(nodeId);
-      computeIgnoreBlacklisting();
-      break;
-    case N_NODE_COUNT_UPDATED:
-      AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
-      numClusterNodes = event.getNodeCount();
-      LOG.info("Num cluster nodes = " + numClusterNodes);
-      computeIgnoreBlacklisting();
-      break;
-    case N_TURNED_UNHEALTHY:
-    case N_TURNED_HEALTHY:
-      AMNode amNode = nodeMap.get(nodeId);
-      if (amNode == null) {
-        LOG.info("Ignoring RM Health Update for unknwon node: " + nodeId);
-      } else {
-        amNode.handle(rEvent);
-      }
-      break;
-    default:
-      nodeMap.get(nodeId).handle(rEvent);
-    }
-  }
-
-  // May be incorrect if there's multiple NodeManagers running on a single host.
-  // knownNodeCount is based on node managers, not hosts. blacklisting is
-  // currently based on hosts.
-  protected void computeIgnoreBlacklisting() {
-    
-    boolean stateChanged = false;
-    
-    if (!nodeBlacklistingEnabled) {
-      return;
-    }
-    if (blacklistDisablePercent != -1) {
-      if (numClusterNodes == 0) {
-        LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
-        return;
-      }
-      int val = (int) ((float) blacklistMap.size() / numClusterNodes * 100);
-      if (val >= blacklistDisablePercent) {
-        if (ignoreBlacklisting == false) {
-          ignoreBlacklisting = true;
-          LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
-              + ", Blacklisted: " + blacklistMap.size());
-          stateChanged = true;
-        }
-      } else {
-        if (ignoreBlacklisting == true) {
-          ignoreBlacklisting = false;
-          LOG.info("Ignore blacklisting set to false. Known: "
-              + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
-          stateChanged = true;
-        }
-      }
-    }
-
-    if (stateChanged) {
-      sendIngoreBlacklistingStateToNodes();
-    }
-  }
-
-  private void sendIngoreBlacklistingStateToNodes() {
-    AMNodeEventType eventType =
-        ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
-        : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
-    for (NodeId nodeId : nodeMap.keySet()) {
-      sendEvent(new AMNodeEvent(nodeId, eventType));
-    }
-  }
-
-  public AMNode get(NodeId nodeId) {
-    return nodeMap.get(nodeId);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void sendEvent(Event<?> event) {
-    this.eventHandler.handle(event);
-  }
-
-  public int size() {
-    return nodeMap.size();
-  }
-
-  @Private
-  @VisibleForTesting
-  public boolean isBlacklistingIgnored() {
-    return this.ignoreBlacklisting;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
new file mode 100644
index 0000000..39a3bfc
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -0,0 +1,211 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm.node;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AMNodeTracker extends AbstractService implements
+    EventHandler<AMNodeEvent> {
+  
+  static final Log LOG = LogFactory.getLog(AMNodeTracker.class);
+  
+  private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
+  private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+  @SuppressWarnings("rawtypes")
+  private final EventHandler eventHandler;
+  private final AppContext appContext;
+  private int numClusterNodes;
+  private boolean ignoreBlacklisting = false;
+  private int maxTaskFailuresPerNode;
+  private boolean nodeBlacklistingEnabled;
+  private int blacklistDisablePercent;
+  float currentIgnoreBlacklistingCountThreshold = 0;
+  
+  @SuppressWarnings("rawtypes")
+  public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
+    super("AMNodeMap");
+    this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
+    this.blacklistMap = new ConcurrentHashMap<String, Set<NodeId>>();
+    this.eventHandler = eventHandler;
+    this.appContext = appContext;
+  }
+  
+  @Override
+  public synchronized void serviceInit(Configuration conf) {
+    this.maxTaskFailuresPerNode = conf.getInt(
+        TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 
+        TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT);
+    this.nodeBlacklistingEnabled = conf.getBoolean(
+        TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED,
+        TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT);
+    this.blacklistDisablePercent = conf.getInt(
+          TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD,
+          TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+
+    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
+        ", blacklistingEnabled: " + nodeBlacklistingEnabled + 
+        ", maxTaskFailuresPerNode: " + maxTaskFailuresPerNode);
+
+    if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+      throw new TezUncheckedException("Invalid blacklistDisablePercent: "
+          + blacklistDisablePercent
+          + ". Should be an integer between 0 and 100 or -1 to disabled");
+    }
+  }
+  
+  public void nodeSeen(NodeId nodeId) {
+    if (nodeMap.putIfAbsent(nodeId, new AMNodeImpl(nodeId, maxTaskFailuresPerNode,
+        eventHandler, nodeBlacklistingEnabled, appContext)) == null) {
+      LOG.info("Adding new node: " + nodeId);
+    }
+  }
+
+  private void addToBlackList(NodeId nodeId) {
+    String host = nodeId.getHost();
+    Set<NodeId> nodes;
+    
+    if (!blacklistMap.containsKey(host)) {
+      nodes = new HashSet<NodeId>();
+      blacklistMap.put(host, nodes);
+    } else {
+      nodes = blacklistMap.get(host);
+    }
+    
+    if (!nodes.contains(nodeId)) {
+      nodes.add(nodeId);
+    }
+  }
+
+  boolean registerBadNodeAndShouldBlacklist(AMNode amNode) {
+    if (nodeBlacklistingEnabled) {
+      addToBlackList(amNode.getNodeId());
+      computeIgnoreBlacklisting();
+      return !ignoreBlacklisting;
+    } else {
+      return false;
+    }
+  }
+
+  public void handle(AMNodeEvent rEvent) {
+    // No synchronization required until there's multiple dispatchers.
+    NodeId nodeId = rEvent.getNodeId();
+    switch (rEvent.getType()) {
+    case N_NODE_COUNT_UPDATED:
+      AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+      numClusterNodes = event.getNodeCount();
+      LOG.info("Num cluster nodes = " + numClusterNodes);
+      recomputeCurrentIgnoreBlacklistingThreshold();
+      computeIgnoreBlacklisting();
+      break;
+    case N_TURNED_UNHEALTHY:
+    case N_TURNED_HEALTHY:
+      AMNode amNode = nodeMap.get(nodeId);
+      if (amNode == null) {
+        LOG.info("Ignoring RM Health Update for unknown node: " + nodeId);
+      } else {
+        amNode.handle(rEvent);
+      }
+      break;
+    default:
+      nodeMap.get(nodeId).handle(rEvent);
+    }
+  }
+
+  private void recomputeCurrentIgnoreBlacklistingThreshold() {
+    if (nodeBlacklistingEnabled && blacklistDisablePercent != -1) {
+      currentIgnoreBlacklistingCountThreshold =
+          (float) numClusterNodes * blacklistDisablePercent / 100;
+    }
+  }
+
+  // May be incorrect if there's multiple NodeManagers running on a single host.
+  // knownNodeCount is based on node managers, not hosts. blacklisting is
+  // currently based on hosts.
+  protected void computeIgnoreBlacklisting() {
+
+    boolean stateChanged = false;
+
+    if (!nodeBlacklistingEnabled || blacklistDisablePercent == -1 || blacklistMap.size() == 0) {
+      return;
+    }
+    if (blacklistMap.size() >= currentIgnoreBlacklistingCountThreshold) {
+      if (ignoreBlacklisting == false) {
+        ignoreBlacklisting = true;
+        LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+            + ", Blacklisted: " + blacklistMap.size());
+        stateChanged = true;
+      }
+    } else {
+      if (ignoreBlacklisting == true) {
+        ignoreBlacklisting = false;
+        LOG.info("Ignore blacklisting set to false. Known: "
+            + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+        stateChanged = true;
+      }
+    }
+
+    if (stateChanged) {
+      sendIngoreBlacklistingStateToNodes();
+    }
+  }
+
+  private void sendIngoreBlacklistingStateToNodes() {
+    AMNodeEventType eventType =
+        ignoreBlacklisting ? AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED
+        : AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED;
+    for (NodeId nodeId : nodeMap.keySet()) {
+      sendEvent(new AMNodeEvent(nodeId, eventType));
+    }
+  }
+
+  public AMNode get(NodeId nodeId) {
+    return nodeMap.get(nodeId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void sendEvent(Event<?> event) {
+    this.eventHandler.handle(event);
+  }
+
+  public int getNumNodes() {
+    return nodeMap.size();
+  }
+
+  @Private
+  @VisibleForTesting
+  public boolean isBlacklistingIgnored() {
+    return this.ignoreBlacklisting;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 20d152e..b06c1a0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -80,7 +80,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
-import org.apache.tez.dag.app.rm.node.AMNodeMap;
+import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -135,10 +135,10 @@ public class TestContainerReuse {
       mock(ContainerHeartbeatHandler.class),
       mock(TaskAttemptListener.class),
       new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
@@ -273,9 +273,9 @@ public class TestContainerReuse {
       mock(ContainerHeartbeatHandler.class),
       mock(TaskAttemptListener.class),
       new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -376,9 +376,9 @@ public class TestContainerReuse {
     AppContext appContext = mock(AppContext.class);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
         mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -516,9 +516,9 @@ public class TestContainerReuse {
     AppContext appContext = mock(AppContext.class);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
         mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -708,9 +708,9 @@ public class TestContainerReuse {
         mock(ContainerHeartbeatHandler.class),
         mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -836,9 +836,9 @@ public class TestContainerReuse {
       mock(ContainerHeartbeatHandler.class),
       mock(TaskAttemptListener.class),
       new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(true).when(appContext).isSession();
     doReturn(dagID).when(appContext).getCurrentDAGID();
@@ -956,9 +956,9 @@ public class TestContainerReuse {
     ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
         mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(amNodeMap).when(appContext).getAllNodes();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
     doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
     doReturn(true).when(appContext).isSession();
     doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
deleted file mode 100644
index 8744b92..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm.node;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
-import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
-import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings({ "resource", "rawtypes" })
-public class TestAMNodeMap {
-
-  private static final Log LOG = LogFactory.getLog(TestAMNodeMap.class);
-
-  DrainDispatcher dispatcher;
-  EventHandler eventHandler;
-  
-  @Before
-  public void setup() {
-    dispatcher = new DrainDispatcher();
-    dispatcher.init(new Configuration());
-    dispatcher.start();
-    eventHandler = dispatcher.getEventHandler();
-  }
-  
-  class TestEventHandler implements EventHandler{
-    List<Event> events = Lists.newLinkedList();
-    @SuppressWarnings("unchecked")
-    @Override
-    public void handle(Event event) {
-      events.add(event);
-      eventHandler.handle(event);
-    }
-  }
-  
-  @After
-  public void teardown() {
-    dispatcher.stop();
-  }
-  
-  @Test(timeout=5000)
-  public void testHealthUpdateKnownNode() {
-    AppContext appContext = mock(AppContext.class);
-
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
-    amNodeMap.init(new Configuration(false));
-    amNodeMap.start();
-
-    NodeId nodeId = NodeId.newInstance("host1", 2342);
-    amNodeMap.nodeSeen(nodeId);
-
-    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
-    amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
-    dispatcher.await();
-    assertEquals(AMNodeState.UNHEALTHY, amNodeMap.get(nodeId).getState());
-    amNodeMap.stop();
-  }
-
-  @Test(timeout=5000)
-  public void testHealthUpdateUnknownNode() {
-    AppContext appContext = mock(AppContext.class);
-
-    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
-    amNodeMap.init(new Configuration(false));
-    amNodeMap.start();
-
-    NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
-
-    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
-    amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
-    dispatcher.await();
-
-    amNodeMap.stop();
-    // No exceptions - the status update was ignored. Not bothering to capture
-    // the log message for verification.
-  }
-  
-  @Test(timeout=10000)
-  public void testNodeSelfBlacklist() throws InterruptedException {
-    AppContext appContext = mock(AppContext.class);
-    Configuration conf = new Configuration(false);
-    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
-    TestEventHandler handler = new TestEventHandler();
-    AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
-    AMContainerMap amContainerMap = mock(AMContainerMap.class);
-    TaskSchedulerEventHandler taskSchedulerEventHandler =
-        mock(TaskSchedulerEventHandler.class);
-    dispatcher.register(AMNodeEventType.class, amNodeMap);
-    dispatcher.register(AMContainerEventType.class, amContainerMap);
-    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
-    amNodeMap.init(conf);
-    amNodeMap.start();
-
-    amNodeMap.handle(new AMNodeEventNodeCountUpdated(4));
-    NodeId nodeId = NodeId.newInstance("host1", 1234);
-    NodeId nodeId2 = NodeId.newInstance("host2", 1234);
-    NodeId nodeId3 = NodeId.newInstance("host3", 1234);
-    NodeId nodeId4 = NodeId.newInstance("host4", 1234);
-    amNodeMap.nodeSeen(nodeId);
-    amNodeMap.nodeSeen(nodeId2);
-    amNodeMap.nodeSeen(nodeId3);
-    amNodeMap.nodeSeen(nodeId4);
-    AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
-    
-    ContainerId cId1 = mock(ContainerId.class);
-    ContainerId cId2 = mock(ContainerId.class);
-    ContainerId cId3 = mock(ContainerId.class);
-    
-    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
-    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
-    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
-    assertEquals(3, node.containers.size());
-    
-    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
-    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
-    TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
-    
-    amNodeMap.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
-    assertEquals(1, node.numSuccessfulTAs);
-    
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
-    assertEquals(1, node.numSuccessfulTAs);
-    assertEquals(1, node.numFailedTAs);
-    assertEquals(AMNodeState.ACTIVE, node.getState());
-    // duplicate should not affect anything
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
-    assertEquals(1, node.numSuccessfulTAs);
-    assertEquals(1, node.numFailedTAs);
-    assertEquals(AMNodeState.ACTIVE, node.getState());
-    
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
-    dispatcher.await();
-    assertEquals(1, node.numSuccessfulTAs);
-    assertEquals(2, node.numFailedTAs);
-    assertEquals(AMNodeState.BLACKLISTED, node.getState());
-    
-    assertEquals(5, handler.events.size());
-    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
-    assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
-    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
-    assertEquals(cId2, ((AMContainerEventNodeFailed)handler.events.get(1)).getContainerId());
-    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(2).getType());
-    assertEquals(cId3, ((AMContainerEventNodeFailed)handler.events.get(2)).getContainerId());
-    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(3).getType());
-    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklistUpdate)handler.events.get(3)).getNodeId());
-    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(4).getType());
-    assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(4)).getNodeId());
-    
-    ContainerId cId4 = mock(ContainerId.class);
-    ContainerId cId5 = mock(ContainerId.class);
-    TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
-    TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
-    AMNodeImpl node2 = (AMNodeImpl) amNodeMap.get(nodeId2);
-    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
-    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
-    
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
-    assertEquals(1, node2.numFailedTAs);
-    assertEquals(AMNodeState.ACTIVE, node2.getState());
-    
-    handler.events.clear();
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
-    dispatcher.await();
-    assertEquals(2, node2.numFailedTAs);
-    assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
-    AMNodeImpl node3 = (AMNodeImpl)amNodeMap.get(nodeId3);
-    assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
-    assertEquals(10, handler.events.size());
-
-    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
-    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
-    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(2).getType());
-    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
-    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(4).getType());
-    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(5).getType());
-    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(6).getType());
-    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(7).getType());
-    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(8).getType());
-    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(9).getType());
-    // drain all previous events
-    Thread.sleep(500l);
-    dispatcher.await();
-
-    handler.events.clear();
-    amNodeMap.handle(new AMNodeEventNodeCountUpdated(8));
-    dispatcher.await();
-    Thread.sleep(1000l);
-    dispatcher.await();
-    LOG.info(("Completed waiting for dispatcher to process all pending events"));
-    assertEquals(AMNodeState.BLACKLISTED, node.getState());
-    assertEquals(AMNodeState.BLACKLISTED, node2.getState());
-    assertEquals(AMNodeState.ACTIVE, node3.getState());
-    assertEquals(8, handler.events.size());
-
-    int index = 0;
-    int numBlacklistingDisabledEvents = 0;
-    int numNodeBlacklistedEvents = 0;
-    int numNodeWasBlacklistedEvents = 0;
-    for (Event event : handler.events) {
-      LOG.info("Logging event: index:" + index++
-          + " type: " + event.getType());
-      if (event.getType() == AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED) {
-        numBlacklistingDisabledEvents++;
-      } else if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
-        numNodeBlacklistedEvents++;
-      } else if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) {
-        numNodeWasBlacklistedEvents++;
-      } else {
-        Assert.assertTrue("Unexpected event: " + event.getType(), false);        
-      }
-    }
-    assertEquals(4, numBlacklistingDisabledEvents);
-    assertEquals(2, numNodeBlacklistedEvents);
-    assertEquals(2, numNodeWasBlacklistedEvents);
-    
-    amNodeMap.stop();
-  }
-
-  private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
-    NodeReport nodeReport = NodeReport.newInstance(nodeId, nodeState, nodeId.getHost() + ":3433",
-        "/default-rack", Resource.newInstance(0, 0), Resource.newInstance(10240, 12), 10,
-        nodeState.toString(), System.currentTimeMillis());
-    return nodeReport;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/e530a46c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
new file mode 100644
index 0000000..8da2513
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm.node;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings({ "resource", "rawtypes" })
+public class TestAMNodeTracker {
+
+  private static final Log LOG = LogFactory.getLog(TestAMNodeTracker.class);
+
+  DrainDispatcher dispatcher;
+  EventHandler eventHandler;
+  
+  @Before
+  public void setup() {
+    dispatcher = new DrainDispatcher();
+    dispatcher.init(new Configuration());
+    dispatcher.start();
+    eventHandler = dispatcher.getEventHandler();
+  }
+  
+  class TestEventHandler implements EventHandler{
+    List<Event> events = Lists.newLinkedList();
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(Event event) {
+      events.add(event);
+      eventHandler.handle(event);
+    }
+  }
+  
+  @After
+  public void teardown() {
+    dispatcher.stop();
+  }
+  
+  @Test(timeout=5000)
+  public void testHealthUpdateKnownNode() {
+    AppContext appContext = mock(AppContext.class);
+
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    amNodeTracker.init(new Configuration(false));
+    amNodeTracker.start();
+
+    NodeId nodeId = NodeId.newInstance("host1", 2342);
+    amNodeTracker.nodeSeen(nodeId);
+
+    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+    dispatcher.await();
+    assertEquals(AMNodeState.UNHEALTHY, amNodeTracker.get(nodeId).getState());
+    amNodeTracker.stop();
+  }
+
+  @Test(timeout=5000)
+  public void testHealthUpdateUnknownNode() {
+    AppContext appContext = mock(AppContext.class);
+
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    amNodeTracker.init(new Configuration(false));
+    amNodeTracker.start();
+
+    NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
+
+    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
+    amNodeTracker.handle(new AMNodeEventStateChanged(nodeReport));
+    dispatcher.await();
+
+    amNodeTracker.stop();
+    // No exceptions - the status update was ignored. Not bothering to capture
+    // the log message for verification.
+  }
+
+  @Test (timeout = 5000)
+  public void testSingleNodeNotBlacklisted() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
+    conf.setInt(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 33);
+
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(1));
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    amNodeTracker.nodeSeen(nodeId);
+
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+
+    ContainerId cId1 = mock(ContainerId.class);
+    ContainerId cId2 = mock(ContainerId.class);
+
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+
+    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId1, ta1, true));
+    dispatcher.await();
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    dispatcher.await();
+    assertEquals(2, node.numFailedTAs);
+    assertEquals(1, handler.events.size());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(0).getType());
+    assertEquals(AMNodeState.FORCED_ACTIVE, node.getState());
+  }
+
+  @Test(timeout=10000)
+  public void testNodeSelfBlacklist() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext);
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    TaskSchedulerEventHandler taskSchedulerEventHandler =
+        mock(TaskSchedulerEventHandler.class);
+    dispatcher.register(AMNodeEventType.class, amNodeTracker);
+    dispatcher.register(AMContainerEventType.class, amContainerMap);
+    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
+    amNodeTracker.init(conf);
+    amNodeTracker.start();
+
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(4));
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    NodeId nodeId2 = NodeId.newInstance("host2", 1234);
+    NodeId nodeId3 = NodeId.newInstance("host3", 1234);
+    NodeId nodeId4 = NodeId.newInstance("host4", 1234);
+    amNodeTracker.nodeSeen(nodeId);
+    amNodeTracker.nodeSeen(nodeId2);
+    amNodeTracker.nodeSeen(nodeId3);
+    amNodeTracker.nodeSeen(nodeId4);
+    AMNodeImpl node = (AMNodeImpl) amNodeTracker.get(nodeId);
+    
+    ContainerId cId1 = mock(ContainerId.class);
+    ContainerId cId2 = mock(ContainerId.class);
+    ContainerId cId3 = mock(ContainerId.class);
+    
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
+    assertEquals(3, node.containers.size());
+    
+    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
+    
+    amNodeTracker.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
+    assertEquals(1, node.numSuccessfulTAs);
+    
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+    // duplicate should not affect anything
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+    
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
+    dispatcher.await();
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(2, node.numFailedTAs);
+    assertEquals(AMNodeState.BLACKLISTED, node.getState());
+    
+    assertEquals(4, handler.events.size());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+    assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
+    assertEquals(cId2, ((AMContainerEventNodeFailed)handler.events.get(1)).getContainerId());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(2).getType());
+    assertEquals(cId3, ((AMContainerEventNodeFailed)handler.events.get(2)).getContainerId());
+    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(3).getType());
+    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklistUpdate)handler.events.get(3)).getNodeId());
+
+
+    // Trigger one more node failure, which should cause BLACKLISTING to be disabled
+    ContainerId cId4 = mock(ContainerId.class);
+    ContainerId cId5 = mock(ContainerId.class);
+    TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
+    AMNodeImpl node2 = (AMNodeImpl) amNodeTracker.get(nodeId2);
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
+    amNodeTracker.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
+    
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
+    assertEquals(1, node2.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node2.getState());
+    
+    handler.events.clear();
+    amNodeTracker.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
+    dispatcher.await();
+    assertEquals(2, node2.numFailedTAs);
+    assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
+    AMNodeImpl node3 = (AMNodeImpl) amNodeTracker.get(nodeId3);
+    assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
+    assertEquals(5, handler.events.size());
+
+    // Blacklisting Disabled, the node causing this will not be blacklisted. The single node that
+    // was blacklisted will be unblacklisted.
+    int numIgnoreBlacklistingEnabledEvents = 0;
+    int numUnblacklistedEvents = 0;
+    for (Event event : handler.events) {
+      if (event.getType() == AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED) {
+        numIgnoreBlacklistingEnabledEvents++;
+      } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+        numUnblacklistedEvents++;
+      } else {
+        fail("Unexpected event of type: " + event.getType());
+      }
+    }
+    assertEquals(4, numIgnoreBlacklistingEnabledEvents);
+    assertEquals(1, numUnblacklistedEvents);
+
+    // drain all previous events
+    dispatcher.await();
+
+
+    // Increase the number of nodes. BLACKLISTING should be re-enabled.
+    // Node 1 and Node 2 should go into BLACKLISTED state
+    handler.events.clear();
+    amNodeTracker.handle(new AMNodeEventNodeCountUpdated(8));
+    dispatcher.await();
+    LOG.info(("Completed waiting for dispatcher to process all pending events"));
+    assertEquals(AMNodeState.BLACKLISTED, node.getState());
+    assertEquals(AMNodeState.BLACKLISTED, node2.getState());
+    assertEquals(AMNodeState.ACTIVE, node3.getState());
+    assertEquals(8, handler.events.size());
+
+    int index = 0;
+    int numIgnoreBlacklistingDisabledEvents = 0;
+    int numBlacklistedEvents = 0;
+    int numNodeFailedEvents = 0;
+    for (Event event : handler.events) {
+      LOG.info("Logging event: index:" + index++
+          + " type: " + event.getType());
+      if (event.getType() == AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED) {
+        numIgnoreBlacklistingDisabledEvents++;
+      } else if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+        numBlacklistedEvents++;
+      } else if (event.getType() == AMContainerEventType.C_NODE_FAILED) {
+        numNodeFailedEvents++;
+        // Node2 is now blacklisted so the container's will be informed
+        assertTrue(((AMContainerEventNodeFailed) event).getContainerId() == cId4 ||
+            ((AMContainerEventNodeFailed) event).getContainerId() == cId5);
+      } else {
+        fail("Unexpected event of type: " + event.getType());
+      }
+    }
+    assertEquals(4, numIgnoreBlacklistingDisabledEvents);
+    assertEquals(2, numBlacklistedEvents);
+    assertEquals(2, numNodeFailedEvents);
+    
+    amNodeTracker.stop();
+  }
+
+  private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
+    NodeReport nodeReport = NodeReport.newInstance(nodeId, nodeState, nodeId.getHost() + ":3433",
+        "/default-rack", Resource.newInstance(0, 0), Resource.newInstance(10240, 12), 10,
+        nodeState.toString(), System.currentTimeMillis());
+    return nodeReport;
+  }
+}


Mime
View raw message