hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1381473 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/m...
Date Thu, 06 Sep 2012 06:39:39 GMT
Author: sseth
Date: Thu Sep  6 06:39:38 2012
New Revision: 1381473

URL: http://svn.apache.org/viewvc?rev=1381473&view=rev
Log:
MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted (sseth)

Added:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Thu Sep  6
06:39:38 2012
@@ -12,3 +12,5 @@ Branch MR-3902
   MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437. (sseth)
 
   MAPREDUCE-4619. Change AMContainerMap to extend AbstractService (Tsuyoshi OZAWA via sseth)
+
+  MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted. (sseth)

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java?rev=1381473&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
(added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
Thu Sep  6 06:39:38 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
+
+  private final NodeId nodeId; // May need to be host instead.
+
+  public AMSchedulerEventNodeBlacklisted(NodeId nodeId, boolean headRoomChanged) {
+    super(AMSchedulerEventType.S_NODE_BLACKLISTED);
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
Thu Sep  6 06:39:38 2012
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -84,8 +85,6 @@ import org.apache.hadoop.yarn.util.RackR
 public class RMContainerAllocator extends AbstractService
     implements ContainerAllocator {
 
-// TODO XXX: Factor in MAPREDUCE-4437. Reduce scheduling needs to be looked into IAC
-
   static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   
   public static final 
@@ -115,8 +114,7 @@ public class RMContainerAllocator extend
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final AMContainerMap containerMap;
-  
-  //TODO XXX Make Configurable.
+
   // Run the scheduler if it hasn't run for this interval.
   private long scheduleInterval = 1000l;
   
@@ -219,7 +217,8 @@ public class RMContainerAllocator extend
     LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse
         + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: "
         + maxReduceRampupLimit + ", maxReducePreemptionLimit: "
-        + maxReducePreemptionLimit);
+        + maxReducePreemptionLimit + ", scheduleThreadInterval: "
+        + scheduleInterval + " ms");
     RackResolver.init(conf);
   }
 
@@ -351,7 +350,7 @@ public class RMContainerAllocator extend
     case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler.
       break;
     case S_NODE_BLACKLISTED:
-      // TODO XXX Withdraw requests related to this node and place new ones.
+      handleNodeBlacklisted((AMSchedulerEventNodeBlacklisted) sEvent);
       break;
     case S_NODE_UNHEALTHY:
       // Ignore. RM will not allocated containers on this node.
@@ -376,10 +375,12 @@ public class RMContainerAllocator extend
       event.getCapability().setMemory(reduceResourceReqt);
       if (event.isRescheduled()) {
         pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
-            event, PRIORITY_REDUCE), event));
+            event.getCapability(), event.getHosts(), event.getRacks(),
+            PRIORITY_REDUCE), event));
       } else {
         pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
-            event, PRIORITY_REDUCE), event));
+            event.getCapability(), event.getHosts(), event.getRacks(),
+            PRIORITY_REDUCE), event));
       }
     }
   }
@@ -392,7 +393,7 @@ public class RMContainerAllocator extend
       removed = scheduledRequests.remove(aId);
       if (!removed) {
         // Maybe assigned.
-        ContainerId containerId = assignedRequests.getContainerId(aId);
+        ContainerId containerId = assignedRequests.remove(aId);
         if (containerId != null) {
           // Ask the container to stop.
           sendEvent(new AMContainerEvent(containerId,
@@ -425,8 +426,6 @@ public class RMContainerAllocator extend
           + event.getAttemptID() + ". Full event: " + event);
     }
   }
-
-  // TODO XXX: Deal with node blacklisting.
   
   private void handleContainersAllocated(
       AMSchedulerEventContainersAllocated event) {
@@ -447,9 +446,23 @@ public class RMContainerAllocator extend
     schedule();
   }
 
-  // TODO XXX: Deal with node blacklisting.
-  
-  
+  // TODO Add a test later if TestRMContainerAllocator does not have one for
+  // blacklisting.
+  private void handleNodeBlacklisted(AMSchedulerEventNodeBlacklisted event) {
+    NodeId nodeId = event.getNodeId();
+    String host = nodeId.getHost();
+    // Only maps would have asked for containers on a specific node.
+    List<TaskAttemptId> affectedAttemptIds = scheduledRequests.mapsHostMapping.get(host);
+    for (TaskAttemptId taId : affectedAttemptIds) {
+      ContainerRequestInfo cr = scheduledRequests.maps.get(taId);
+      scheduledRequests.remove(taId);
+      scheduledRequests.addMap(cr.launchRequestEvent);
+    }
+    // Instead of removing / re-adding each individual request, it may be more
+    // efficient to modify internal data structures, and send a request to the
+    // RMComm to completely forget about a host. 
+  }
+
   // TODO Override for re-use.
   protected synchronized void assignContainers() {
     if (availableContainerIds.size() > 0) {
@@ -771,14 +784,20 @@ public class RMContainerAllocator extend
       return null;
     }
     
+    /**
+     * Considers node blacklisting while create container ask requests for the 
+     * RMContainerAllocator.
+     */
     void addMap(AMSchedulerTALaunchRequestEvent event) {
       ContainerRequest request = null;
-      
+
       if (event.isRescheduled()) {
         earlierFailedMaps.add(event.getAttemptID());
-        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+        request = new ContainerRequest(event.getCapability(), event.getHosts(),
+            event.getRacks(), PRIORITY_FAST_FAIL_MAP);
         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
       } else {
+        List<String> hosts = new LinkedList<String>();
         for (String host : event.getHosts()) {
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           if (list == null) {
@@ -789,6 +808,14 @@ public class RMContainerAllocator extend
           if (LOG.isDebugEnabled()) {
             LOG.debug("Added attempt req to host " + host);
           }
+          if (!appContext.getAllNodes().isHostBlackListed(host)) {
+            hosts.add(host);
+          } else {
+            // Leaving the entries in mapsHostMapping etc. Will allow allocation
+            // in case all nodes get blacklisted / blacklisting gets enabled.
+            LOG.info("XXX: Host: " + host
+                + " is blacklisted. Not including in Container request");
+          }
        }
        for (String rack: event.getRacks()) {
          LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
@@ -801,9 +828,13 @@ public class RMContainerAllocator extend
             LOG.debug("Added attempt req to rack " + rack);
          }
        }
-       request = new ContainerRequest(event, PRIORITY_MAP);
+        request = new ContainerRequest(event.getCapability(),
+            hosts.toArray(new String[0]), event.getRacks(), PRIORITY_MAP);
       }
-//      ContainerRequestInfo csInfo = new ContainerRequestInfo(request, event.getAttemptID());
+      // ContainerRequestInfo ends up with the correct ContainerRequest, and the
+      // original event.
+      // Remove works on the basis of the ContainerRequest while asking the
+      // RMComm to decrement a container request.
       maps.put(event.getAttemptID(), new ContainerRequestInfo(request, event));
       requestor.addContainerReq(request);
     }
@@ -864,40 +895,38 @@ public class RMContainerAllocator extend
             isAssignable = false;
           }
         }          
-        
-//        boolean blackListed = false;
-        boolean nodeUsable = true;
+
+        boolean nodeUnhealthy = false;
+        boolean blackListed = false;
         ContainerRequestInfo assigned = null;
         
         if (isAssignable) {
-          // do not assign if allocated container is on a  
-          // blacklisted host
           String allocatedHost = allocated.getNodeId().getHost();
-          // TODO XXX: Modify the Request table as and when containers are allocated on bad
hosts, as against updating the table as soon as a node is blacklisted / lost. 
-          // Blakclisted nodes should likely be removed immediately.
           
           // TODO Differentiation between blacklisted versus unusable nodes ?
-          boolean blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost);
-          nodeUsable = appContext.getNode(allocated.getNodeId()).isUsable();
+          // Ideally there should be no assignments on unhealthy nodes.
+          blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost);
+          nodeUnhealthy = appContext.getNode(allocated.getNodeId()).isUnhealthy();
           
-          if (!nodeUsable || blackListed) {
+          if (nodeUnhealthy || blackListed) {
             // we need to request for a new container 
             // and release the current one
-            LOG.info("Got allocated container on an unusable "
-                + " host "+allocatedHost
-                +". Releasing container " + allocated);
-
-            // find the request matching this allocated container 
-            // and replace it with a new one 
+            LOG.info("Got allocated container on an unusable " + " host "
+                + allocatedHost + ". Releasing container " + allocated
+                + " NodeUnhealthy: " + nodeUnhealthy + ", NodeBlackListed: "
+                + blackListed);
+
+            // find the request matching this allocated container and replace it
+            // with a new one. Have to ensure a request goes out to the RM
+            // asking for a new container. Hence a decRequest + addRequest.
             ContainerRequestInfo toBeReplacedReq = 
                 getContainerReqToReplace(allocated);
-            
-            // TODO XXX: Requirement here is to be able to figure out the taskAttemptId for
which this request was put. If that's being replaced, update corresponding maps with info.
-            // Effectively a RequestInfo to attemptId map - or a structure which includes
both.
-            
+
             if (toBeReplacedReq != null) {
               LOG.info("Placing a new container request for task attempt " 
                   + toBeReplacedReq.getAttemptId());
+              // This isn't necessarily needed, since the request should have changed
+              // when the node blacklist event was received.
               ContainerRequestInfo newReq = 
                   getFilteredContainerRequest(toBeReplacedReq);
               requestor.decContainerReq(toBeReplacedReq.getContainerRequest());
@@ -922,15 +951,16 @@ public class RMContainerAllocator extend
               requestor.decContainerReq(assigned.getContainerRequest());
 
               // TODO Maybe: ApplicationACLs should be populated into the appContext from
the RMCommunicator.
-              
-              
-              // TODO XXX: Launch only if not already running.
-              // TODO XXX: Change this event to be more specific.
+
               if (appContext.getContainer(containerId).getState() == AMContainerState.ALLOCATED)
{
-                eventHandler.handle(new AMContainerLaunchRequestEvent(containerId, attemptToLaunchRequestMap.get(assigned.getAttemptId()),
requestor.getApplicationAcls(), getJob().getID()));
+                eventHandler.handle(new AMContainerLaunchRequestEvent(
+                    containerId, attemptToLaunchRequestMap.get(assigned
+                        .getAttemptId()), requestor.getApplicationAcls(),
+                    getJob().getID()));
               }
-              eventHandler.handle(new AMContainerAssignTAEvent(containerId, assigned.getAttemptId(),
attemptToLaunchRequestMap.get(assigned.getAttemptId()).getRemoteTask()));
-              // TODO XXX: If re-using, get rid of one request.
+              eventHandler.handle(new AMContainerAssignTAEvent(containerId,
+                  assigned.getAttemptId(), attemptToLaunchRequestMap.get(
+                      assigned.getAttemptId()).getRemoteTask()));
 
               assignedRequests.add(allocated, assigned.getAttemptId());
 
@@ -951,15 +981,13 @@ public class RMContainerAllocator extend
         
         // release container if it was blacklisted 
         // or if we could not assign it 
-        if (!nodeUsable || assigned == null) {
+        if (blackListed || nodeUnhealthy || assigned == null) {
           containersReleased++;
           sendEvent(new AMContainerEvent(containerId, AMContainerEventType.C_STOP_REQUEST));
         }
       }
     }
-    
-    // TODO XXX: Check whether the node is bad before an assign.
-    
+
     private ContainerRequestInfo assign(Container allocated) {
       ContainerRequestInfo assigned = null;
       
@@ -1070,7 +1098,8 @@ public class RMContainerAllocator extend
             assigned = maps.remove(tId);
             JobCounterUpdateEvent jce =
               new JobCounterUpdateEvent(tId.getTaskId().getJobId());
-            // TODO XXX: Move these counter updated to go out from the TaskAttempt.
+            // TODO XXX (After MR-3902 if the counter updates are correct): Move
+            // these counter updated to go out from the TaskAttempt.
             jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
             eventHandler.handle(jce);
             hostLocalAssigned++;
@@ -1158,20 +1187,7 @@ public class RMContainerAllocator extend
         eventHandler.handle(new TaskAttemptEventKillRequest(id, "Pre-empting reduce"));
       }
     }
-    
-    ContainerId getContainerId(TaskAttemptId taId) {
-      ContainerId containerId = null;
-      if (taId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        containerId = maps.get(taId).getId();
-      } else {
-        containerId = reduces.get(taId).getId();
-      }
-      return containerId;
-    }
-    
-    // TODO XXX Check where all this is being used.
-    // XXX: Likely needed in case of TA failed / killed / terminated as well.
-    // Old code was removing when CONTAINER_COMPLETED was received fromthe RM.
+
     ContainerId remove(TaskAttemptId tId) {
       ContainerId containerId = null;
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
@@ -1179,7 +1195,6 @@ public class RMContainerAllocator extend
       } else {
         containerId = reduces.remove(tId).getId();
         if (containerId != null) {
-          // TODO XXX -> Revisit remove(), semantics change.
           boolean preempted = preemptionWaitingReduces.remove(tId);
           if (preempted) {
             LOG.info("Reduce preemption successful " + tId);

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
Thu Sep  6 06:39:38 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventReleased;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventStateChanged;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
@@ -95,12 +96,12 @@ public class RMContainerRequestor extend
   private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0);
   private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>();
   
-  // TODO XXX: May need to pass this to the AMNodeMap
   private int clusterNmCount = 0;
   
   // TODO XXX Consider allowing sync comm between the requestor and allocator... 
   
-  // TODO (after 3902): Why does the RMRequestor require the ClientService ?? (for the RPC
address. get rid of this.)
+  // TODO (after 3902): Why does the RMRequestor require the ClientService ??
+  // (for the RPC address. get rid of this.)
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
     this.clock = context.getClock();
@@ -112,11 +113,6 @@ public class RMContainerRequestor extend
     final String[] racks;
     final Priority priority;
 
-    public ContainerRequest(AMSchedulerTALaunchRequestEvent event,
-        Priority priority) {
-      this(event.getCapability(), event.getHosts(), event.getRacks(), priority);
-    }
-
     public ContainerRequest(Resource capability, String[] hosts,
         String[] racks, Priority priority) {
       this.capability = capability;
@@ -157,9 +153,9 @@ public class RMContainerRequestor extend
     // Create resource requests
     for (String host : req.hosts) {
       // Data-local
-      if (!context.getAllNodes().isHostBlackListed(host)) {
-        addResourceRequest(req.priority, host, req.capability);
-      }      
+      // Assumes the scheduler is handling bad nodes. Tracking them here would
+      // lead to an out-of-sync scheduler / requestor.
+      addResourceRequest(req.priority, host, req.capability);
     }
 
     // Nothing Rack-local for now
@@ -316,6 +312,7 @@ public class RMContainerRequestor extend
     LOG.info("BeforeHeartbeat: " + getStat());
     int headRoom = getAvailableResources() != null ? getAvailableResources()
         .getMemory() : 0;// first time it would be null
+    int lastClusterNmCount = clusterNmCount;
     AMResponse response = errorCheckedMakeRemoteRequest();
     
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
@@ -334,6 +331,12 @@ public class RMContainerRequestor extend
  
     LOG.info("AfterHeartbeat: " + getStat());
     
+    if (clusterNmCount != lastClusterNmCount) {
+      LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+          + clusterNmCount);
+      eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+    }
+    
     // Inform the Containers about completion..
     for (ContainerStatus c : finishedContainers) {
       eventHandler.handle(new AMContainerEventReleased(c));
@@ -344,7 +347,6 @@ public class RMContainerRequestor extend
     if (newContainers.size() > 0) {
       newContainerIds = new ArrayList<ContainerId>(newContainers.size());
       for (Container container : newContainers) {
-        // TODO XXX Re-factor AMNodes and AMContainers.
         context.getAllContainers().addNewContainer(container);
         newContainerIds.add(container.getId()); 
         context.getAllNodes().nodeSeen(container.getNodeId());
@@ -432,7 +434,6 @@ public class RMContainerRequestor extend
       RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)
rawEvent;
       releaseLock.lock();
       try {
-        // TODO XXX: Currently the RM does not handle release requests for RUNNING containers.
         numContainerReleaseRequests++;
         release.add(event.getContainerId());
       } finally {

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java
Thu Sep  6 06:39:38 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import java.util.List;
@@ -12,6 +30,6 @@ public interface AMNode extends EventHan
   public AMNodeState getState();
   public List<ContainerId> getContainers();
 
-  public boolean isUsable();
+  public boolean isUnhealthy();
   public boolean isBlacklisted();
 }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java
Thu Sep  6 06:39:38 2012
@@ -1,10 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 
-// TODO: Implement.
-
 public class AMNodeEvent extends AbstractEvent<AMNodeEventType> {
 
   private final NodeId nodeId;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java?rev=1381473&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
(added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java
Thu Sep  6 06:39:38 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.node;
+
+public class AMNodeEventNodeCountUpdated extends AMNodeEvent {
+
+  private final int count;
+  
+  public AMNodeEventNodeCountUpdated(int nodeCount) {
+    super(null, AMNodeEventType.N_NODE_COUNT_UPDATED);
+    this.count = nodeCount;
+  }
+  
+  public int getNodeCount() {
+    return this.count;
+  }
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java
Thu Sep  6 06:39:38 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 public enum AMNodeEventType {
@@ -11,11 +29,13 @@ public enum AMNodeEventType {
   //Producer: RMCommunicator
   N_TURNED_UNHEALTHY,
   N_TURNED_HEALTHY,
+  N_NODE_COUNT_UPDATED,
   
   //Producer: AMNodeManager
   N_BLACKLISTING_ENABLED,
   N_BLACKLISTING_DISABLED,
   
-  //Producer: Node - Will not reach NodeImpl. Used to compute whether blacklisting should
be ignored.
+  // Producer: AMNode - Will not reach AMNodeImpl. Used to compute whether
+  // blacklisting should be ignored.
   N_NODE_WAS_BLACKLISTED
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java
Thu Sep  6 06:39:38 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import java.util.EnumSet;
@@ -378,11 +396,10 @@ public class AMNodeImpl implements AMNod
   }
 
   @Override
-  public boolean isUsable() {
+  public boolean isUnhealthy() {
     this.readLock.lock();
     try {
-      return (EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE)
-          .contains(getState()));
+      return getState() == AMNodeState.UNHEALTHY;
     } finally {
       this.readLock.unlock();
     }

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1381473&r1=1381472&r2=1381473&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java
Thu Sep  6 06:39:38 2012
@@ -1,3 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.hadoop.mapreduce.v2.app2.rm.node;
 
 import java.util.ArrayList;
@@ -15,26 +33,28 @@ import org.apache.hadoop.yarn.service.Ab
 
 public class AMNodeMap extends AbstractService implements
     EventHandler<AMNodeEvent> {
-
+  
   static final Log LOG = LogFactory.getLog(AMNodeMap.class);
   
   private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
-  // TODO XXX -> blacklistMap is also used for computing forcedUnblacklisting.
   private final ConcurrentHashMap<String, ArrayList<NodeId>> blacklistMap;
   private final EventHandler<?> eventHandler;
   private final AppContext appContext;
+  private int numClusterNodes;
+  private boolean ignoreBlacklisting = false;
   private int maxTaskFailuresPerNode;
   private boolean nodeBlacklistingEnabled;
   private int blacklistDisablePercent;
   
+  
+  // TODO XXX Ensure there's a test for IgnoreBlacklisting in
+  // TestRMContainerAllocator. Otherwise add one.
   public AMNodeMap(EventHandler<?> eventHandler, AppContext appContext) {
     super("AMNodeMap");
     this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>();
     this.blacklistMap = new ConcurrentHashMap<String, ArrayList<NodeId>>();
     this.eventHandler = eventHandler;
     this.appContext = appContext;
-  
-    // TODO XXX: Get a handle of allowed failures.
   }
   
   @Override
@@ -66,13 +86,14 @@ public class AMNodeMap extends AbstractS
   }
   
   public boolean isHostBlackListed(String hostname) {
-    if (!nodeBlacklistingEnabled) {
+    // TODO Maybe: For now, forced unblacklisting is being handled
+    // here. An AMNode will never go into the BLACKLISTED / FORCED_ACTIVE state.
+    if (!nodeBlacklistingEnabled || ignoreBlacklisting) {
       return false;
     }
-    
     return blacklistMap.containsKey(hostname);
   }
-  
+
   private void addToBlackList(NodeId nodeId) {
     String host = nodeId.getHost();
     ArrayList<NodeId> nodes;
@@ -99,23 +120,59 @@ public class AMNodeMap extends AbstractS
     }
   }
   */
-  
-  public void handle(AMNodeEvent event) {
-    if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) {
-      NodeId nodeId = event.getNodeId();
+
+  public void handle(AMNodeEvent rEvent) {
+    // No synchronization required until there's multiple dispatchers.
+    NodeId nodeId = rEvent.getNodeId();
+    switch (rEvent.getType()) {
+    case N_NODE_WAS_BLACKLISTED:
       addToBlackList(nodeId);
-    } else {
-      NodeId nodeId = event.getNodeId();
-      nodeMap.get(nodeId).handle(event);
+      computeIgnoreBlacklisting();
+      break;
+    case N_NODE_COUNT_UPDATED:
+      AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
+      numClusterNodes = event.getNodeCount();
+      computeIgnoreBlacklisting();
+      break;
+    default:
+      nodeMap.get(nodeId).handle(rEvent);
     }
   }
-  
+
+  // May be incorrect if there's multiple NodeManagers running on a single host.
+  // knownNodeCount is based on node managers, not hosts. blacklisting is
+  // currently based on hosts.
+  protected void computeIgnoreBlacklisting() {
+    if (!nodeBlacklistingEnabled) {
+      return;
+    }
+    if (blacklistDisablePercent != -1) {
+      if (numClusterNodes == 0) {
+        LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
+        return;
+      }
+      int val = (int) ((float) blacklistMap.size() / numClusterNodes * 100);
+      if (val >= blacklistDisablePercent) {
+        if (ignoreBlacklisting == false) {
+          ignoreBlacklisting = true;
+          LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes
+              + ", Blacklisted: " + blacklistMap.size());
+        }
+      } else {
+        if (ignoreBlacklisting == true) {
+          ignoreBlacklisting = false;
+          LOG.info("Ignore blacklisting set to false. Known: "
+              + numClusterNodes + ", Blacklisted: " + blacklistMap.size());
+        }
+      }
+    }
+  }
+
   public AMNode get(NodeId nodeId) {
     return nodeMap.get(nodeId);
   }
-  
+
   public int size() {
     return nodeMap.size();
   }
-  
-}
+}
\ No newline at end of file



Mime
View raw message