hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1463203 [5/8] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org...
Date Mon, 01 Apr 2013 16:47:34 GMT
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Mon Apr  1 16:47:16 2013
@@ -86,6 +86,14 @@ public class TestNMWebServer {
       public long getPmemAllocatedForContainers() {
         return 0;
       }
+      @Override
+      public boolean isVmemCheckEnabled() {
+        return true;
+      }
+      @Override
+      public boolean isPmemCheckEnabled() {
+        return true;
+      }
     };
     Configuration conf = new Configuration();
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
@@ -126,6 +134,14 @@ public class TestNMWebServer {
       public long getPmemAllocatedForContainers() {
         return 0;
       }
+      @Override
+      public boolean isVmemCheckEnabled() {
+        return true;
+      }
+      @Override
+      public boolean isPmemCheckEnabled() {
+        return true;
+      }
     };
     Configuration conf = new Configuration();
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java Mon Apr  1 16:47:16 2013
@@ -100,6 +100,14 @@ public class TestNMWebServices extends J
           // 16G in bytes
           return new Long("17179869184");
         }
+        @Override
+        public boolean isVmemCheckEnabled() {
+          return true;
+        }
+        @Override
+        public boolean isPmemCheckEnabled() {
+          return true;
+        }
       };
       Configuration conf = new Configuration();
       conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
@@ -294,6 +302,8 @@ public class TestNMWebServices extends J
               "totalVmemAllocatedContainersMB"),
           WebServicesTestUtils.getXmlLong(element,
               "totalPmemAllocatedContainersMB"),
+          WebServicesTestUtils.getXmlBoolean(element, "vmemCheckEnabled"),
+          WebServicesTestUtils.getXmlBoolean(element, "pmemCheckEnabled"),
           WebServicesTestUtils.getXmlLong(element, "lastNodeUpdateTime"),
           WebServicesTestUtils.getXmlBoolean(element, "nodeHealthy"),
           WebServicesTestUtils.getXmlString(element, "nodeHostName"),
@@ -310,10 +320,12 @@ public class TestNMWebServices extends J
   public void verifyNodeInfo(JSONObject json) throws JSONException, Exception {
     assertEquals("incorrect number of elements", 1, json.length());
     JSONObject info = json.getJSONObject("nodeInfo");
-    assertEquals("incorrect number of elements", 13, info.length());
+    assertEquals("incorrect number of elements", 15, info.length());
     verifyNodeInfoGeneric(info.getString("id"), info.getString("healthReport"),
         info.getLong("totalVmemAllocatedContainersMB"),
         info.getLong("totalPmemAllocatedContainersMB"),
+        info.getBoolean("vmemCheckEnabled"),
+        info.getBoolean("pmemCheckEnabled"),
         info.getLong("lastNodeUpdateTime"), info.getBoolean("nodeHealthy"),
         info.getString("nodeHostName"), info.getString("hadoopVersionBuiltOn"),
         info.getString("hadoopBuildVersion"), info.getString("hadoopVersion"),
@@ -325,6 +337,7 @@ public class TestNMWebServices extends J
 
   public void verifyNodeInfoGeneric(String id, String healthReport,
       long totalVmemAllocatedContainersMB, long totalPmemAllocatedContainersMB,
+      boolean vmemCheckEnabled, boolean pmemCheckEnabled,
       long lastNodeUpdateTime, Boolean nodeHealthy, String nodeHostName,
       String hadoopVersionBuiltOn, String hadoopBuildVersion,
       String hadoopVersion, String resourceManagerVersionBuiltOn,
@@ -337,6 +350,8 @@ public class TestNMWebServices extends J
         totalVmemAllocatedContainersMB);
     assertEquals("totalPmemAllocatedContainersMB incorrect", 16384,
         totalPmemAllocatedContainersMB);
+    assertEquals("vmemCheckEnabled incorrect",  true, vmemCheckEnabled);
+    assertEquals("pmemCheckEnabled incorrect",  true, pmemCheckEnabled);
     assertTrue("lastNodeUpdateTime incorrect", lastNodeUpdateTime == nmContext
         .getNodeHealthStatus().getLastHealthReportTime());
     assertTrue("nodeHealthy isn't true", nodeHealthy);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Mon Apr  1 16:47:16 2013
@@ -106,6 +106,16 @@ public class TestNMWebServicesApps exten
           // 16G in bytes
           return new Long("17179869184");
         }
+
+        @Override
+        public boolean isVmemCheckEnabled() {
+          return true;
+        }
+
+        @Override
+        public boolean isPmemCheckEnabled() {
+          return true;
+        }
       };
       conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
       conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Mon Apr  1 16:47:16 2013
@@ -106,6 +106,16 @@ public class TestNMWebServicesContainers
           // 16G in bytes
           return new Long("17179869184");
         }
+
+        @Override
+        public boolean isVmemCheckEnabled() {
+          return true;
+        }
+
+        @Override
+        public boolean isPmemCheckEnabled() {
+          return true;
+        }
       };
       conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
       conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Mon Apr  1 16:47:16 2013
@@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -78,10 +77,12 @@ public class ApplicationMasterService ex
   private YarnScheduler rScheduler;
   private InetSocketAddress bindAddress;
   private Server server;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
-      new ConcurrentHashMap<ApplicationAttemptId, AMResponse>();
-  private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
+      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
+  private final AllocateResponse reboot =
+      recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
 
   public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
@@ -166,7 +167,7 @@ public class ApplicationMasterService ex
     authorizeRequest(applicationAttemptId);
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
-    AMResponse lastResponse = responseMap.get(applicationAttemptId);
+    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
     if (lastResponse == null) {
       String message = "Application doesn't exist in cache "
           + applicationAttemptId;
@@ -214,7 +215,7 @@ public class ApplicationMasterService ex
         .getApplicationAttemptId();
     authorizeRequest(applicationAttemptId);
 
-    AMResponse lastResponse = responseMap.get(applicationAttemptId);
+    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
     if (lastResponse == null) {
       String message = "Application doesn't exist in cache "
           + applicationAttemptId;
@@ -248,25 +249,20 @@ public class ApplicationMasterService ex
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
     /* check if its in cache */
-    AllocateResponse allocateResponse = recordFactory
-        .newRecordInstance(AllocateResponse.class);
-    AMResponse lastResponse = responseMap.get(appAttemptId);
+    AllocateResponse lastResponse = responseMap.get(appAttemptId);
     if (lastResponse == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
-      allocateResponse.setAMResponse(reboot);
-      return allocateResponse;
+      return reboot;
     }
     if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
       /* old heartbeat */
-      allocateResponse.setAMResponse(lastResponse);
-      return allocateResponse;
+      return lastResponse;
     } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
       LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
       // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
       // Reboot is not useful since after AM reboots, it will send register and 
       // get an exception. Might as well throw an exception here.
-      allocateResponse.setAMResponse(reboot);
-      return allocateResponse;
+      return reboot;
     } 
     
     // Allow only one thread in AM to do heartbeat at a time.
@@ -288,7 +284,8 @@ public class ApplicationMasterService ex
           appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       
-      AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+      AllocateResponse allocateResponse =
+          recordFactory.newRecordInstance(AllocateResponse.class);
 
       // update the response with the deltas of node status changes
       List<RMNode> updatedNodes = new ArrayList<RMNode>();
@@ -311,34 +308,34 @@ public class ApplicationMasterService ex
           
           updatedNodeReports.add(report);
         }
-        response.setUpdatedNodes(updatedNodeReports);
+        allocateResponse.setUpdatedNodes(updatedNodeReports);
       }
 
-      response.setAllocatedContainers(allocation.getContainers());
-      response.setCompletedContainersStatuses(appAttempt
+      allocateResponse.setAllocatedContainers(allocation.getContainers());
+      allocateResponse.setCompletedContainersStatuses(appAttempt
           .pullJustFinishedContainers());
-      response.setResponseId(lastResponse.getResponseId() + 1);
-      response.setAvailableResources(allocation.getResourceLimit());
+      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
+      allocateResponse.setAvailableResources(allocation.getResourceLimit());
       
-      AMResponse oldResponse = responseMap.put(appAttemptId, response);
+      AllocateResponse oldResponse =
+          responseMap.put(appAttemptId, allocateResponse);
       if (oldResponse == null) {
         // appAttempt got unregistered, remove it back out
         responseMap.remove(appAttemptId);
         String message = "App Attempt removed from the cache during allocate"
             + appAttemptId;
         LOG.error(message);
-        allocateResponse.setAMResponse(reboot);
-        return allocateResponse;
+        return reboot;
       }
       
-      allocateResponse.setAMResponse(response);
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
       return allocateResponse;
     }
   }
 
   public void registerAppAttempt(ApplicationAttemptId attemptId) {
-    AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+    AllocateResponse response =
+        recordFactory.newRecordInstance(AllocateResponse.class);
     response.setResponseId(0);
     LOG.info("Registering " + attemptId);
     responseMap.put(attemptId, response);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Mon Apr  1 16:47:16 2013
@@ -436,7 +436,6 @@ public class ClientRMService extends Abs
       response.setQueueInfo(queueInfo);
     } catch (IOException ioe) {
       LOG.info("Failed to getQueueInfo for " + request.getQueueName(), ioe);
-      throw RPCUtil.getRemoteException(ioe);
     }
     
     return response;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon Apr  1 16:47:16 2013
@@ -139,6 +139,8 @@ public class ResourceManager extends Com
   @Override
   public synchronized void init(Configuration conf) {
 
+    validateConfigs(conf);
+
     this.conf = conf;
 
     this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -325,6 +327,15 @@ public class ResourceManager extends Com
       this.applicationACLsManager, this.conf);
   }
 
+  protected static void validateConfigs(Configuration conf) {
+    int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    if (globalMaxAppAttempts <= 0) {
+      throw new YarnException(
+          "The global max attempts should be a positive integer.");
+    }
+  }
+
   @Private
   public static class SchedulerEventDispatcher extends AbstractService
       implements EventHandler<SchedulerEvent> {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Apr  1 16:47:16 2013
@@ -39,11 +39,9 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@@ -78,15 +76,9 @@ public class ResourceTrackerService exte
   .newRecordInstance(NodeHeartbeatResponse.class);
   
   static {
-    HeartbeatResponse rebootResp = recordFactory
-        .newRecordInstance(HeartbeatResponse.class);
-    rebootResp.setNodeAction(NodeAction.REBOOT);
-    reboot.setHeartbeatResponse(rebootResp);
-    
-    HeartbeatResponse decommissionedResp = recordFactory
-        .newRecordInstance(HeartbeatResponse.class);
-    decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
-    shutDown.setHeartbeatResponse(decommissionedResp);
+    reboot.setNodeAction(NodeAction.REBOOT);
+
+    shutDown.setNodeAction(NodeAction.SHUTDOWN);
   }
 
   public ResourceTrackerService(RMContext rmContext,
@@ -157,22 +149,19 @@ public class ResourceTrackerService exte
 
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
-    RegistrationResponse regResponse = recordFactory
-        .newRecordInstance(RegistrationResponse.class);
 
     // Check if this node is a 'valid' node
     if (!this.nodesListManager.isValidNode(host)) {
       LOG.info("Disallowed NodeManager from  " + host
           + ", Sending SHUTDOWN signal to the NodeManager.");
-      regResponse.setNodeAction(NodeAction.SHUTDOWN);
-      response.setRegistrationResponse(regResponse);
+      response.setNodeAction(NodeAction.SHUTDOWN);
       return response;
     }
 
     if (isSecurityEnabled()) {
       MasterKey nextMasterKeyForNode =
           this.containerTokenSecretManager.getCurrentKey();
-      regResponse.setMasterKey(nextMasterKeyForNode);
+      response.setMasterKey(nextMasterKeyForNode);
     }
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
@@ -195,8 +184,7 @@ public class ResourceTrackerService exte
         + " httpPort: " + httpPort + ") " + "registered with capability: "
         + capability + ", assigned nodeId " + nodeId);
 
-    regResponse.setNodeAction(NodeAction.NORMAL);
-    response.setRegistrationResponse(regResponse);
+    response.setNodeAction(NodeAction.NORMAL);
     return response;
   }
 
@@ -240,17 +228,16 @@ public class ResourceTrackerService exte
         .newRecordInstance(NodeHeartbeatResponse.class);
     
     // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
-    HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
-    if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
+    NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
+    if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
         .getResponseId()) {
       LOG.info("Received duplicate heartbeat from node "
           + rmNode.getNodeAddress());
-      nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
-      return nodeHeartBeatResponse;
-    } else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
+      return lastNodeHeartbeatResponse;
+    } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
         .getResponseId()) {
       LOG.info("Too far behind rm response id:"
-          + lastHeartbeatResponse.getResponseId() + " nm response id:"
+          + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
           + remoteNodeStatus.getResponseId());
       // TODO: Just sending reboot is not enough. Think more.
       this.rmContext.getDispatcher().getEventHandler().handle(
@@ -259,12 +246,9 @@ public class ResourceTrackerService exte
     }
 
     // Heartbeat response
-    HeartbeatResponse latestResponse = recordFactory
-        .newRecordInstance(HeartbeatResponse.class);
-    latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
-    latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
-    latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
-    latestResponse.setNodeAction(NodeAction.NORMAL);
+    nodeHeartBeatResponse.setResponseId(lastNodeHeartbeatResponse.getResponseId() + 1);
+    rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
+    nodeHeartBeatResponse.setNodeAction(NodeAction.NORMAL);
 
     // Check if node's masterKey needs to be updated and if the currentKey has
     // roller over, send it across
@@ -283,7 +267,7 @@ public class ResourceTrackerService exte
         }
       }
       if (shouldSendMasterKey) {
-        latestResponse.setMasterKey(nextMasterKeyForNode);
+        nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
       }
     }
 
@@ -291,9 +275,8 @@ public class ResourceTrackerService exte
     this.rmContext.getDispatcher().getEventHandler().handle(
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
             remoteNodeStatus.getContainersStatuses(), 
-            remoteNodeStatus.getKeepAliveApplications(), latestResponse));
+            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
 
-    nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
     return nodeHeartBeatResponse;
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Mon Apr  1 16:47:16 2013
@@ -197,6 +197,9 @@ public class AMLauncher implements Runna
         String.valueOf(rmContext.getRMApps()
             .get(applicationId)
             .getSubmitTime()));
+    environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
+        String.valueOf(rmContext.getRMApps().get(
+            applicationId).getMaxAppAttempts()));
  
     if (UserGroupInformation.isSecurityEnabled()) {
       // TODO: Security enabled/disabled info should come from RM.

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Mon Apr  1 16:47:16 2013
@@ -173,4 +173,10 @@ public interface RMApp extends EventHand
    * {@link FinishApplicationMasterRequest#setFinishApplicationStatus(FinalApplicationStatus)}.
    */
   FinalApplicationStatus getFinalApplicationStatus();
+
+  /**
+   * The number of max attempts of the application.
+   * @return the number of max attempts of the application.
+   */
+  int getMaxAppAttempts();
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Apr  1 16:47:16 2013
@@ -87,7 +87,7 @@ public class RMAppImpl implements RMApp,
   private final YarnScheduler scheduler;
   private final ApplicationMasterService masterService;
   private final StringBuilder diagnostics = new StringBuilder();
-  private final int maxRetries;
+  private final int maxAppAttempts;
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final Map<ApplicationAttemptId, RMAppAttempt> attempts
@@ -231,8 +231,19 @@ public class RMAppImpl implements RMApp,
     this.submitTime = submitTime;
     this.startTime = System.currentTimeMillis();
 
-    this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
-        YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
+    int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
+    if (individualMaxAppAttempts <= 0 ||
+        individualMaxAppAttempts > globalMaxAppAttempts) {
+      this.maxAppAttempts = globalMaxAppAttempts;
+      LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+          + " for application: " + applicationId.getId()
+          + " is invalid, because it is out of the range [1, "
+          + globalMaxAppAttempts + "]. Use the global max attempts instead.");
+    } else {
+      this.maxAppAttempts = individualMaxAppAttempts;
+    }
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -406,7 +417,8 @@ public class RMAppImpl implements RMApp,
       String host = UNAVAILABLE;
       String origTrackingUrl = UNAVAILABLE;
       int rpcPort = -1;
-      ApplicationResourceUsageReport appUsageReport = null;
+      ApplicationResourceUsageReport appUsageReport =
+          DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
       FinalApplicationStatus finishState = getFinalApplicationStatus();
       String diags = UNAVAILABLE;
       if (allowAccess) {
@@ -418,18 +430,17 @@ public class RMAppImpl implements RMApp,
           host = this.currentAttempt.getHost();
           rpcPort = this.currentAttempt.getRpcPort();
           appUsageReport = currentAttempt.getApplicationResourceUsageReport();
-        } else {
-          currentApplicationAttemptId = 
-              BuilderUtils.newApplicationAttemptId(this.applicationId, 
-                  DUMMY_APPLICATION_ATTEMPT_NUMBER);
         }
+
         diags = this.diagnostics.toString();
-      } else {
-        appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+      }
+
+      if (currentApplicationAttemptId == null) {
         currentApplicationAttemptId = 
             BuilderUtils.newApplicationAttemptId(this.applicationId, 
                 DUMMY_APPLICATION_ATTEMPT_NUMBER);
       }
+
       return BuilderUtils.newApplicationReport(this.applicationId,
           currentApplicationAttemptId, this.user, this.queue,
           this.name, host, rpcPort, clientToken,
@@ -494,6 +505,11 @@ public class RMAppImpl implements RMApp,
   }
 
   @Override
+  public int getMaxAppAttempts() {
+    return this.maxAppAttempts;
+  }
+
+  @Override
   public void handle(RMAppEvent event) {
 
     this.writeLock.lock();
@@ -669,10 +685,10 @@ public class RMAppImpl implements RMApp,
         msg = "Unmanaged application " + app.getApplicationId()
             + " failed due to " + failedEvent.getDiagnostics()
             + ". Failing the application.";
-      } else if (app.attempts.size() >= app.maxRetries) {
+      } else if (app.attempts.size() >= app.maxAppAttempts) {
         retryApp = false;
         msg = "Application " + app.getApplicationId() + " failed "
-            + app.maxRetries + " times due to " + failedEvent.getDiagnostics()
+            + app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
             + ". Failing the application.";
       }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Mon Apr  1 16:47:16 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.res
 /**
  * Interface to an Application Attempt in the Resource Manager.
  * A {@link RMApp} can have multiple app attempts based on
- * {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific
+ * {@link YarnConfiguration#RM_AM_MAX_ATTEMPTS}. For specific
  * implementation take a look at {@link RMAppAttemptImpl}.
  */
 public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Apr  1 16:47:16 2013
@@ -38,9 +38,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -63,10 +63,10 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -147,6 +147,9 @@ public class RMAppAttemptImpl implements
 
   private Configuration conf;
 
+  private static final ExpiredTransition EXPIRED_TRANSITION =
+      new ExpiredTransition();
+
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
@@ -243,7 +246,7 @@ public class RMAppAttemptImpl implements
       .addTransition(
           RMAppAttemptState.LAUNCHED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.EXPIRE,
-          new FinalTransition(RMAppAttemptState.FAILED))
+          EXPIRED_TRANSITION)
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
           new FinalTransition(RMAppAttemptState.KILLED))
@@ -268,7 +271,7 @@ public class RMAppAttemptImpl implements
       .addTransition(
           RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.EXPIRE,
-          new FinalTransition(RMAppAttemptState.FAILED))
+          EXPIRED_TRANSITION)
       .addTransition(
           RMAppAttemptState.RUNNING, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
@@ -491,6 +494,13 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private void setTrackingUrlToRMAppPage() {
+    origTrackingUrl = pjoin(
+        YarnConfiguration.getRMWebAppHostAndPort(conf),
+        "cluster", "app", getAppAttemptId().getApplicationId());
+    proxiedTrackingUrl = origTrackingUrl;
+  }
+
   @Override
   public ClientToken getClientToken() {
     return this.clientToken;
@@ -727,7 +737,7 @@ public class RMAppAttemptImpl implements
 
         // Request a container for the AM.
         ResourceRequest request = BuilderUtils.newResourceRequest(
-            AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
+            AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.submissionContext
                 .getAMContainerSpec().getResource(), 1);
 
         Allocation amContainerAllocation = appAttempt.scheduler.allocate(
@@ -992,7 +1002,23 @@ public class RMAppAttemptImpl implements
       }
     }
   }
-  
+
+  private static class ExpiredTransition extends FinalTransition {
+
+    public ExpiredTransition() {
+      super(RMAppAttemptState.FAILED);
+    }
+
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+      appAttempt.diagnostics.append("ApplicationMaster for attempt " +
+        appAttempt.getAppAttemptId() + " timed out");
+      appAttempt.setTrackingUrlToRMAppPage();
+      super.transition(appAttempt, event);
+    }
+  }
+
   private static class UnexpectedAMRegisteredTransition extends
       BaseFinalTransition {
 
@@ -1110,10 +1136,7 @@ public class RMAppAttemptImpl implements
         // When the AM dies, the trackingUrl is left pointing to the AM's URL,
         // which shows up in the scheduler UI as a broken link.  Direct the
         // user to the app page on the RM so they can see the status and logs.
-        appAttempt.origTrackingUrl = pjoin(
-            YarnConfiguration.getRMWebAppHostAndPort(appAttempt.conf),
-            "cluster", "app", appAttempt.getAppAttemptId().getApplicationId());
-        appAttempt.proxiedTrackingUrl = appAttempt.origTrackingUrl;
+        appAttempt.setTrackingUrlToRMAppPage();
 
         new FinalTransition(RMAppAttemptState.FAILED).transition(
             appAttempt, containerFinishedEvent);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java Mon Apr  1 16:47:16 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 
 /**
  * Node managers information on available resources 
@@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.server.api
  */
 public interface RMNode {
 
-  public static final String ANY = "*";
-
   /**
    * the node id of of this node.
    * @return the node id of this node.
@@ -105,5 +103,20 @@ public interface RMNode {
 
   public List<ApplicationId> getAppsToCleanup();
 
-  public HeartbeatResponse getLastHeartBeatResponse();
+  /**
+   * Update a {@link NodeHeartbeatResponse} with the list of containers and
+   * applications to clean up for this node.
+   * @param response the {@link NodeHeartbeatResponse} to update
+   */
+  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+
+  public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
+  
+  /**
+   * Get and clear the list of containerUpdates accumulated across NM
+   * heartbeats.
+   * 
+   * @return containerUpdates accumulated across NM heartbeats.
+   */
+  public List<UpdatedContainerInfo> pullContainerUpdates();
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Apr  1 16:47:16 2013
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -45,7 +46,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is used to keep track of all the applications/containers
  * running on a node.
@@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNod
   private final ReadLock readLock;
   private final WriteLock writeLock;
 
+  private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+  private volatile boolean nextHeartBeat = true;
+
   private final NodeId nodeId;
   private final RMContext context;
   private final String hostName;
@@ -101,8 +107,8 @@ public class RMNodeImpl implements RMNod
   /* the list of applications that have finished and need to be purged */
   private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
 
-  private HeartbeatResponse latestHeartBeatResponse = recordFactory
-      .newRecordInstance(HeartbeatResponse.class);
+  private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
+      .newRecordInstance(NodeHeartbeatResponse.class);
   
   private static final StateMachineFactory<RMNodeImpl,
                                            NodeState,
@@ -178,7 +184,7 @@ public class RMNodeImpl implements RMNod
     this.nodeHealthStatus.setHealthReport("Healthy");
     this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
 
-    this.latestHeartBeatResponse.setResponseId(0);
+    this.latestNodeHeartBeatResponse.setResponseId(0);
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -186,6 +192,7 @@ public class RMNodeImpl implements RMNod
 
     this.stateMachine = stateMachineFactory.make(this);
     
+    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();  
   }
 
   @Override
@@ -297,12 +304,27 @@ public class RMNodeImpl implements RMNod
   };
 
   @Override
-  public HeartbeatResponse getLastHeartBeatResponse() {
+  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+    this.writeLock.lock();
+
+    try {
+      response.addAllContainersToCleanup(
+          new ArrayList<ContainerId>(this.containersToClean));
+      response.addAllApplicationsToCleanup(this.finishedApplications);
+      this.containersToClean.clear();
+      this.finishedApplications.clear();
+    } finally {
+      this.writeLock.unlock();
+    }
+  };
+
+  @Override
+  public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
 
     this.readLock.lock();
 
     try {
-      return this.latestHeartBeatResponse;
+      return this.latestNodeHeartBeatResponse;
     } finally {
       this.readLock.unlock();
     }
@@ -400,6 +422,7 @@ public class RMNodeImpl implements RMNod
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Kill containers since node is rejoining.
+      rmNode.nodeUpdateQueue.clear();
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
 
@@ -407,7 +430,7 @@ public class RMNodeImpl implements RMNod
       if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
           && rmNode.getHttpPort() == newNode.getHttpPort()) {
         // Reset heartbeat ID since node just restarted.
-        rmNode.getLastHeartBeatResponse().setResponseId(0);
+        rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeAddedSchedulerEvent(rmNode));
       } else {
@@ -458,6 +481,7 @@ public class RMNodeImpl implements RMNod
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
+      rmNode.nodeUpdateQueue.clear();
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
       rmNode.context.getDispatcher().getEventHandler().handle(
@@ -483,12 +507,15 @@ public class RMNodeImpl implements RMNod
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
 
       // Switch the last heartbeatresponse.
-      rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
+      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
 
       NodeHealthStatus remoteNodeHealthStatus = 
           statusEvent.getNodeHealthStatus();
       rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
+        LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
+            + remoteNodeHealthStatus.getHealthReport());
+        rmNode.nodeUpdateQueue.clear();
         // Inform the scheduler
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
@@ -538,20 +565,20 @@ public class RMNodeImpl implements RMNod
           completedContainers.add(remoteContainer);
         }
       }
-
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, 
-              completedContainers));
+      if(newlyLaunchedContainers.size() != 0 
+          || completedContainers.size() != 0) {
+        rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
+            (newlyLaunchedContainers, completedContainers));
+      }
+      if(rmNode.nextHeartBeat) {
+        rmNode.nextHeartBeat = false;
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeUpdateSchedulerEvent(rmNode));
+      }
       
       rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
           statusEvent.getKeepAliveAppIds());
 
-      // HeartBeat processing from our end is done, as node pulls the following
-      // lists before sending status-updates. Clear data-structures
-      // TODO: These lists could go to the NM multiple times, or never.
-      rmNode.containersToClean.clear();
-      rmNode.finishedApplications.clear();
-
       return NodeState.RUNNING;
     }
   }
@@ -564,7 +591,7 @@ public class RMNodeImpl implements RMNod
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
 
       // Switch the last heartbeatresponse.
-      rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
+      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
       NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
       rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
       if (remoteNodeHealthStatus.getIsNodeHealthy()) {
@@ -584,4 +611,25 @@ public class RMNodeImpl implements RMNod
       return NodeState.UNHEALTHY;
     }
   }
+
+  @Override
+  public List<UpdatedContainerInfo> pullContainerUpdates() {
+    List<UpdatedContainerInfo> latestContainerInfoList = 
+        new ArrayList<UpdatedContainerInfo>();
+    while(nodeUpdateQueue.peek() != null){
+      latestContainerInfoList.add(nodeUpdateQueue.poll());
+    }
+    this.nextHeartBeat = true;
+    return latestContainerInfoList;
+  }
+
+  @VisibleForTesting
+  public void setNextHeartBeat(boolean nextHeartBeat) {
+    this.nextHeartBeat = nextHeartBeat;
+  }
+  
+  @VisibleForTesting
+  public int getQueueSize() {
+    return nodeUpdateQueue.size();
+  }
  }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java Mon Apr  1 16:47:16 2013
@@ -24,18 +24,18 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 
 public class RMNodeStatusEvent extends RMNodeEvent {
 
   private final NodeHealthStatus nodeHealthStatus;
   private final List<ContainerStatus> containersCollection;
-  private final HeartbeatResponse latestResponse;
+  private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
-      HeartbeatResponse latestResponse) {
+      NodeHeartbeatResponse latestResponse) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeHealthStatus = nodeHealthStatus;
     this.containersCollection = collection;
@@ -51,7 +51,7 @@ public class RMNodeStatusEvent extends R
     return this.containersCollection;
   }
 
-  public HeartbeatResponse getLatestResponse() {
+  public NodeHeartbeatResponse getLatestResponse() {
     return this.latestResponse;
   }
   

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Mon Apr  1 16:47:16 2013
@@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 
 /**
  * This class keeps track of all the consumption of an application. This also
@@ -129,7 +127,7 @@ public class AppSchedulingInfo {
       boolean updatePendingResources = false;
       ResourceRequest lastRequest = null;
 
-      if (hostName.equals(RMNode.ANY)) {
+      if (hostName.equals(ResourceRequest.ANY)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("update:" + " application=" + applicationId + " request="
               + request);
@@ -195,7 +193,7 @@ public class AppSchedulingInfo {
   }
 
   public synchronized Resource getResource(Priority priority) {
-    ResourceRequest request = getResourceRequest(priority, RMNode.ANY);
+    ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
     return request.getCapability();
   }
 
@@ -261,7 +259,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
+    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
   }
 
   /**
@@ -284,7 +282,7 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(RMNode.ANY));
+    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
   }
 
   /**
@@ -322,7 +320,7 @@ public class AppSchedulingInfo {
   synchronized private void checkForDeactivation() {
     boolean deactivate = true;
     for (Priority priority : getPriorities()) {
-      ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+      ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
       if (request.getNumContainers() > 0) {
         deactivate = false;
         break;
@@ -351,7 +349,7 @@ public class AppSchedulingInfo {
     // clear pending resources metrics for the application
     QueueMetrics metrics = queue.getMetrics();
     for (Map<String, ResourceRequest> asks : requests.values()) {
-      ResourceRequest request = asks.get(RMNode.ANY);
+      ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
             Resources.multiply(request.getCapability(), request

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Apr  1 16:47:16 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -503,6 +504,14 @@ implements ResourceScheduler, CapacitySc
 
     synchronized (application) {
 
+      // make sure we aren't stopping/removing the application
+      // when the allocate comes in
+      if (application.isStopped()) {
+        LOG.info("Calling allocate on a stopped " +
+            "application " + applicationAttemptId);
+        return EMPTY_ALLOCATION;
+      }
+
       if (!ask.isEmpty()) {
 
         if(LOG.isDebugEnabled()) {
@@ -562,15 +571,20 @@ implements ResourceScheduler, CapacitySc
     return root.getQueueUserAclInfo(user);
   }
 
-  private synchronized void nodeUpdate(RMNode nm, 
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     }
-                  
-    FiCaSchedulerNode node = getNode(nm.getNodeID());
 
+    FiCaSchedulerNode node = getNode(nm.getNodeID());
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    }
+    
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -666,9 +680,7 @@ implements ResourceScheduler, CapacitySc
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
     case APP_ADDED:

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Apr  1 16:47:16 2013
@@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -607,6 +606,10 @@ public class LeafQueue implements CSQueu
         newlyParsedLeafQueue.getMaximumActiveApplications(), 
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
+
+    // queue metrics are updated, more resource may be available
+    // activate the pending applications if possible
+    activateApplications();
   }
 
   @Override
@@ -812,7 +815,8 @@ public class LeafQueue implements CSQueu
         for (Priority priority : application.getPriorities()) {
           // Required resource
           Resource required = 
-              application.getResourceRequest(priority, RMNode.ANY).getCapability();
+              application.getResourceRequest(
+                  priority, ResourceRequest.ANY).getCapability();
 
           // Do we need containers at this 'priority'?
           if (!needContainers(application, priority, required)) {
@@ -1157,7 +1161,7 @@ public class LeafQueue implements CSQueu
       FiCaSchedulerApp application, Priority priority, 
       RMContainer reservedContainer) {
     ResourceRequest request = 
-      application.getResourceRequest(priority, RMNode.ANY);
+      application.getResourceRequest(priority, ResourceRequest.ANY);
     if (request != null) {
       if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
           reservedContainer)) {
@@ -1179,7 +1183,7 @@ public class LeafQueue implements CSQueu
 
       // 'Delay' off-switch
       ResourceRequest offSwitchRequest = 
-          application.getResourceRequest(priority, RMNode.ANY);
+          application.getResourceRequest(priority, ResourceRequest.ANY);
       long missedOpportunities = application.getSchedulingOpportunities(priority);
       long requiredContainers = offSwitchRequest.getNumContainers(); 
       
@@ -1477,7 +1481,11 @@ public class LeafQueue implements CSQueu
     CSQueueUtils.updateQueueStatistics(
         resourceCalculator, this, getParent(), clusterResource, 
         minimumAllocation);
-    
+
+    // queue metrics are updated, more resource may be available
+    // activate the pending applications if possible
+    activateApplications();
+
     // Update application properties
     for (FiCaSchedulerApp application : activeApplications) {
       synchronized (application) {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Mon Apr  1 16:47:16 2013
@@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -92,6 +91,9 @@ public class FiCaSchedulerApp extends Sc
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+  private boolean isStopped = false;
+
   
   /**
    * Count how many times the application has been given an opportunity
@@ -133,7 +135,9 @@ public class FiCaSchedulerApp extends Sc
 
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
-    this.appSchedulingInfo.updateResourceRequests(requests);
+    if (!isStopped) {
+      this.appSchedulingInfo.updateResourceRequests(requests);
+    }
   }
 
   public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
@@ -153,7 +157,7 @@ public class FiCaSchedulerApp extends Sc
   }
 
   public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
   }
   
   public Resource getResource(Priority priority) {
@@ -169,6 +173,10 @@ public class FiCaSchedulerApp extends Sc
     return this.appSchedulingInfo.isPending();
   }
 
+  public synchronized boolean isStopped() {
+    return this.isStopped;
+  }
+
   public String getQueueName() {
     return this.appSchedulingInfo.getQueueName();
   }
@@ -184,6 +192,7 @@ public class FiCaSchedulerApp extends Sc
 
   public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
     // Cleanup all scheduling information
+    this.isStopped = true;
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
   }
 
@@ -235,6 +244,10 @@ public class FiCaSchedulerApp extends Sc
   synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
       Priority priority, ResourceRequest request, 
       Container container) {
+
+    if (isStopped) {
+      return null;
+    }
     
     // Required sanity check - AM can call 'allocate' to update resource 
     // request without locking the scheduler, hence we need to check

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Mon Apr  1 16:47:16 2013
@@ -60,8 +60,6 @@ public class FiCaSchedulerNode extends S
   
   private final RMNode rmNode;
 
-  public static final String ANY = "*";
-
   public FiCaSchedulerNode(RMNode node) {
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java Mon Apr  1 16:47:16 2013
@@ -18,35 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class NodeUpdateSchedulerEvent extends SchedulerEvent {
 
   private final RMNode rmNode;
-  private final List<ContainerStatus> newlyLaunchedContainers;
-  private final List<ContainerStatus> completedContainersStatuses;
 
-  public NodeUpdateSchedulerEvent(RMNode rmNode,
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  public NodeUpdateSchedulerEvent(RMNode rmNode) {
     super(SchedulerEventType.NODE_UPDATE);
     this.rmNode = rmNode;
-    this.newlyLaunchedContainers = newlyLaunchedContainers;
-    this.completedContainersStatuses = completedContainers;
   }
 
   public RMNode getRMNode() {
     return rmNode;
   }
-
-  public List<ContainerStatus> getNewlyLaunchedContainers() {
-    return newlyLaunchedContainers;
-  }
-
-  public List<ContainerStatus> getCompletedContainers() {
-    return completedContainersStatuses;
-  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Mon Apr  1 16:47:16 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -336,7 +335,7 @@ public class AppSchedulable extends Sche
         }
 
         ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
-            RMNode.ANY);
+            ResourceRequest.ANY);
         if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
             && allowedLocality.equals(NodeType.OFF_SWITCH)) {
           return assignContainer(node, app, priority, offSwitchRequest,

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Mon Apr  1 16:47:16 2013
@@ -92,13 +92,7 @@ public class FSLeafQueue extends FSQueue
   
   @Override
   public void recomputeFairShares() {
-    if (schedulingMode == SchedulingMode.FAIR) {
-      SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
-    } else {
-      for (AppSchedulable sched: appScheds) {
-        sched.setFairShare(Resources.createResource(0));
-      }
-    }
+    schedulingMode.computeShares(getAppSchedulables(), getFairShare());
   }
 
   @Override
@@ -162,17 +156,9 @@ public class FSLeafQueue extends FSQueue
       return Resources.none(); // We should never get here
     }
 
-    // Otherwise, chose app to schedule based on given policy (fair vs fifo).
+    // Otherwise, chose app to schedule based on given policy.
     else {
-      Comparator<Schedulable> comparator;
-      if (schedulingMode == SchedulingMode.FIFO) {
-        comparator = new SchedulingAlgorithms.FifoComparator();
-      } else if (schedulingMode == SchedulingMode.FAIR) {
-        comparator = new SchedulingAlgorithms.FairShareComparator();
-      } else {
-        throw new RuntimeException("Unsupported queue scheduling mode " + 
-            schedulingMode);
-      }
+      Comparator<Schedulable> comparator = schedulingMode.getComparator();
 
       Collections.sort(appScheds, comparator);
       for (AppSchedulable sched: appScheds) {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Mon Apr  1 16:47:16 2013
@@ -51,7 +51,7 @@ public class FSParentQueue extends FSQue
 
   @Override
   public void recomputeFairShares() {
-    SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
+    SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
     for (FSQueue childQueue : childQueues) {
       childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
       childQueue.recomputeFairShares();

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Mon Apr  1 16:47:16 2013
@@ -40,8 +40,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -153,7 +152,7 @@ public class FSSchedulerApp extends Sche
   }
 
   public synchronized int getTotalRequiredResources(Priority priority) {
-    return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+    return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
   }
   
   public Resource getResource(Priority priority) {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Mon Apr  1 16:47:16 2013
@@ -59,8 +59,6 @@ public class FSSchedulerNode extends Sch
   
   private final RMNode rmNode;
 
-  public static final String ANY = "*";
-
   public FSSchedulerNode(RMNode node) {
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());



Mime
View raw message