hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1463203 [6/8] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org...
Date Mon, 01 Apr 2013 16:47:34 GMT
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Apr  1 16:47:16 2013
@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
@@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -750,15 +750,20 @@ public class FairScheduler implements Re
   /**
    * Process a heartbeat update from a node.
    */
-  private synchronized void nodeUpdate(RMNode nm,
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = nodes.get(nm.getNodeID());
 
+    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    } 
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -798,7 +803,7 @@ public class FairScheduler implements Re
         // At most one task is scheduled each iteration of this loop
         List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
             queueMgr.getLeafQueues());
-        Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+        Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
         boolean assignedContainer = false;
         for (FSLeafQueue sched : scheds) {
           Resource assigned = sched.assignContainer(node, false);
@@ -864,9 +869,7 @@ public class FairScheduler implements Re
         throw new RuntimeException("Unexpected event type: " + event);
       }
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(),
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
       break;
     case APP_ADDED:
       if (!(event instanceof AppAddedSchedulerEvent)) {
@@ -918,25 +921,25 @@ public class FairScheduler implements Re
   @Override
   public synchronized void reinitialize(Configuration conf, RMContext rmContext)
       throws IOException {
+    this.conf = new FairSchedulerConfiguration(conf);
+    minimumAllocation = this.conf.getMinimumMemoryAllocation();
+    maximumAllocation = this.conf.getMaximumMemoryAllocation();
+    userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
+    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+    preemptionEnabled = this.conf.getPreemptionEnabled();
+    assignMultiple = this.conf.getAssignMultiple();
+    maxAssign = this.conf.getMaxAssign();
+    sizeBasedWeight = this.conf.getSizeBasedWeight();
+    
     if (!initialized) {
-      this.conf = new FairSchedulerConfiguration(conf);
       rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
-      minimumAllocation = this.conf.getMinimumMemoryAllocation();
-      maximumAllocation = this.conf.getMaximumMemoryAllocation();
-      userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
-      assignMultiple = this.conf.getAssignMultiple();
-      maxAssign = this.conf.getMaxAssign();
 
       initialized = true;
 
-      sizeBasedWeight = this.conf.getSizeBasedWeight();
-
       try {
         queueMgr.initialize();
       } catch (Exception e) {
@@ -948,14 +951,8 @@ public class FairScheduler implements Re
       updateThread.setDaemon(true);
       updateThread.start();
     } else {
-      this.conf = new FairSchedulerConfiguration(conf);
-      userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
-      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-      preemptionEnabled = this.conf.getPreemptionEnabled();
       try {
         queueMgr.reloadAllocs();
-
       } catch (Exception e) {
         throw new IOException("Failed to initialize FairScheduler", e);
       }
@@ -966,7 +963,7 @@ public class FairScheduler implements Re
   public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
       boolean recursive) throws IOException {
     if (!queueMgr.exists(queueName)) {
-      return null;
+      throw new IOException("queue " + queueName + " does not exist");
     }
     return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
         recursive);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Mon Apr  1 16:47:16 2013
@@ -37,7 +37,6 @@ public class FairSchedulerConfiguration 
   protected static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
   protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
 
-  protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
   protected static final float  DEFAULT_LOCALITY_THRESHOLD = -1.0f;
 
   /** Cluster threshold for node locality. */
@@ -89,10 +88,6 @@ public class FairSchedulerConfiguration 
     return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
   }
 
-  public float getLocalityThreshold() {
-    return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
-  }
-
   public float getLocalityThresholdNode() {
     return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Mon Apr  1 16:47:16 2013
@@ -310,7 +310,7 @@ public class QueueManager {
     int queueMaxAppsDefault = Integer.MAX_VALUE;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-    SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+    SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
 
     // Remember all queue names so we can display them on web UI, etc.
     List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -373,7 +373,8 @@ public class QueueManager {
         queueMaxAppsDefault = val;}
       else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
         String text = ((Text)element.getFirstChild()).getData().trim();
-        defaultSchedulingMode = parseSchedulingMode(text);
+        SchedulingMode.setDefault(text);
+        defaultSchedulingMode = SchedulingMode.getDefault();
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -449,7 +450,7 @@ public class QueueManager {
         minSharePreemptionTimeouts.put(queueName, val);
       } else if ("schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        queueModes.put(queueName, parseSchedulingMode(text));
+        queueModes.put(queueName, SchedulingMode.parse(text));
       } else if ("aclSubmitApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -476,19 +477,6 @@ public class QueueManager {
     }
   }
 
-  private SchedulingMode parseSchedulingMode(String text)
-      throws AllocationConfigurationException {
-    text = text.toLowerCase();
-    if (text.equals("fair")) {
-      return SchedulingMode.FAIR;
-    } else if (text.equals("fifo")) {
-      return SchedulingMode.FIFO;
-    } else {
-      throw new AllocationConfigurationException(
-          "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
-    }
-  }
-
   /**
    * Get the minimum resource allocation for the given queue.
    * @return the cap set on this queue, or 0 if not set.
@@ -663,7 +651,7 @@ public class QueueManager {
       minSharePreemptionTimeouts = new HashMap<String, Long>();
       defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
       fairSharePreemptionTimeout = Long.MAX_VALUE;
-      defaultSchedulingMode = SchedulingMode.FAIR;
+      defaultSchedulingMode = SchedulingMode.getDefault();
     }
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Mon Apr  1 16:47:16 2013
@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.res
  */
 @Private
 @Unstable
-abstract class Schedulable {
+public abstract class Schedulable {
   /** Fair share assigned to this Schedulable */
   private Resource fairShare = Resources.createResource(0);
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java Mon Apr  1 16:47:16 2013
@@ -15,17 +15,104 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
 
-/**
- * Internal scheduling modes for queues.
- */
-@Private
+@Public
 @Unstable
-public enum SchedulingMode {
-  FAIR, FIFO
+public abstract class SchedulingMode {
+  private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
+      new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
+
+  private static SchedulingMode DEFAULT_MODE =
+      getInstance(FairSchedulingMode.class);
+  
+  public static SchedulingMode getDefault() {
+    return DEFAULT_MODE;
+  }
+
+  public static void setDefault(String className)
+      throws AllocationConfigurationException {
+    DEFAULT_MODE = parse(className);
+  }
+
+  /**
+   * Returns a {@link SchedulingMode} instance corresponding to the passed clazz
+   */
+  public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
+    SchedulingMode mode = instances.get(clazz);
+    if (mode == null) {
+      mode = ReflectionUtils.newInstance(clazz, null);
+      instances.put(clazz, mode);
+    }
+    return mode;
+  }
+
+  /**
+   * Returns {@link SchedulingMode} instance corresponding to the
+   * {@link SchedulingMode} passed as a string. The mode can be "fair" for
+   * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
+   * {@link SchedulingMode}s in the RM classpath, the mode should be canonical
+   * class name of the {@link SchedulingMode}.
+   * 
+   * @param mode canonical class name or "fair" or "fifo"
+   * @throws AllocationConfigurationException
+   */
+  @SuppressWarnings("unchecked")
+  public static SchedulingMode parse(String mode)
+      throws AllocationConfigurationException {
+    @SuppressWarnings("rawtypes")
+    Class clazz;
+    String text = mode.toLowerCase();
+    if (text.equals("fair")) {
+      clazz = FairSchedulingMode.class;
+    } else if (text.equals("fifo")) {
+      clazz = FifoSchedulingMode.class;
+    } else {
+      try {
+        clazz = Class.forName(mode);
+      } catch (ClassNotFoundException cnfe) {
+        throw new AllocationConfigurationException(mode
+            + " SchedulingMode class not found!");
+      }
+    }
+    if (!SchedulingMode.class.isAssignableFrom(clazz)) {
+      throw new AllocationConfigurationException(mode
+          + " does not extend SchedulingMode");
+    }
+    return getInstance(clazz);
+  }
+
+  /**
+   * @return returns the name of SchedulingMode
+   */
+  public abstract String getName();
+
+  /**
+   * The comparator returned by this method is to be used for sorting the
+   * {@link Schedulable}s in that queue.
+   * 
+   * @return the comparator to sort by
+   */
+  public abstract Comparator<Schedulable> getComparator();
+
+  /**
+   * Computes and updates the shares of {@link Schedulable}s as per the
+   * SchedulingMode, to be used later at schedule time.
+   * 
+   * @param schedulables {@link Schedulable}s whose shares are to be updated
+   * @param totalResources Total {@link Resource}s in the cluster
+   */
+  public abstract void computeShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources);
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Apr  1 16:47:16 2013
@@ -57,8 +57,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -251,6 +252,15 @@ public class FifoScheduler implements Re
     }
 
     synchronized (application) {
+
+      // make sure we aren't stopping/removing the application
+      // when the allocate comes in
+      if (application.isStopped()) {
+        LOG.info("Calling allocate on a stopped " +
+            "application " + applicationAttemptId);
+        return EMPTY_ALLOCATION;
+      }
+
       if (!ask.isEmpty()) {
         LOG.debug("allocate: pre-update" +
             " applicationId=" + applicationAttemptId + 
@@ -392,7 +402,7 @@ public class FifoScheduler implements Re
   private int getMaxAllocatableContainers(FiCaSchedulerApp application,
       Priority priority, FiCaSchedulerNode node, NodeType type) {
     ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
+      application.getResourceRequest(priority, ResourceRequest.ANY);
     int maxContainers = offSwitchRequest.getNumContainers();
 
     if (type == NodeType.OFF_SWITCH) {
@@ -482,7 +492,7 @@ public class FifoScheduler implements Re
     if (request != null) {
       // Don't allocate on this rack if the application doens't need containers
       ResourceRequest offSwitchRequest =
-          application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
+          application.getResourceRequest(priority, ResourceRequest.ANY);
       if (offSwitchRequest.getNumContainers() <= 0) {
         return 0;
       }
@@ -503,7 +513,7 @@ public class FifoScheduler implements Re
       FiCaSchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
+      application.getResourceRequest(priority, ResourceRequest.ANY);
     if (request != null) {
       assignedContainers = 
         assignContainer(node, application, priority, 
@@ -576,11 +586,16 @@ public class FifoScheduler implements Re
     return assignedContainers;
   }
 
-  private synchronized void nodeUpdate(RMNode rmNode, 
-      List<ContainerStatus> newlyLaunchedContainers,
-      List<ContainerStatus> completedContainers) {
+  private synchronized void nodeUpdate(RMNode rmNode) {
     FiCaSchedulerNode node = getNode(rmNode.getNodeID());
     
+    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
+    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
+    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
+    for(UpdatedContainerInfo containerInfo : containerInfoList) {
+      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
+      completedContainers.addAll(containerInfo.getCompletedContainers());
+    }
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
       containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@@ -628,9 +643,7 @@ public class FifoScheduler implements Re
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = 
       (NodeUpdateSchedulerEvent)event;
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
-          nodeUpdatedEvent.getNewlyLaunchedContainers(),
-          nodeUpdatedEvent.getCompletedContainers());
+      nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
     case APP_ADDED:

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Mon Apr  1 16:47:16 2013
@@ -43,11 +43,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
@@ -200,9 +198,7 @@ public class Application {
     }
       
     // Off-switch
-    addResourceRequest(priority, requests, 
-        org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY, 
-        capability);
+    addResourceRequest(priority, requests, ResourceRequest.ANY, capability);
   }
   
   public synchronized void finishTask(Task task) throws IOException {
@@ -377,10 +373,7 @@ public class Application {
       }
     }
     
-    updateResourceRequest(
-        requests.get(
-            org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY)
-            );
+    updateResourceRequest(requests.get(ResourceRequest.ANY));
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("updateResourceRequests:" + " application=" + applicationId

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Mon Apr  1 16:47:16 2013
@@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -96,14 +94,14 @@ public class MockAM {
     requests.addAll(createReq(hosts, memory, priority, containers));
   }
 
-  public AMResponse schedule() throws Exception {
-    AMResponse response = allocate(requests, releases);
+  public AllocateResponse schedule() throws Exception {
+    AllocateResponse response = allocate(requests, releases);
     requests.clear();
     releases.clear();
     return response;
   }
 
-  public AMResponse allocate(
+  public AllocateResponse allocate(
       String host, int memory, int numContainers,
       List<ContainerId> releases) throws Exception {
     List<ResourceRequest> reqs = createReq(new String[]{host}, memory, 1, numContainers);
@@ -122,8 +120,8 @@ public class MockAM {
       reqs.add(rackReq);
     }
 
-    ResourceRequest offRackReq = createResourceReq("*", memory, priority,
-        containers);
+    ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
+        priority, containers);
     reqs.add(offRackReq);
     return reqs;
 
@@ -143,13 +141,12 @@ public class MockAM {
     return req;
   }
 
-  public AMResponse allocate(
+  public AllocateResponse allocate(
       List<ResourceRequest> resourceRequest, List<ContainerId> releases)
       throws Exception {
     AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId,
         ++responseId, 0F, resourceRequest, releases);
-    AllocateResponse resp = amRMProtocol.allocate(req);
-    return resp.getAMResponse();
+    return amRMProtocol.allocate(req);
   }
 
   public void unregisterAppAttempt() throws Exception {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Mon Apr  1 16:47:16 2013
@@ -33,11 +33,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -79,7 +79,7 @@ public class MockNM {
     nodeHeartbeat(conts, true);
   }
 
-  public RegistrationResponse registerNode() throws Exception {
+  public RegisterNodeManagerResponse registerNode() throws Exception {
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
@@ -87,18 +87,18 @@ public class MockNM {
     Resource resource = Records.newRecord(Resource.class);
     resource.setMemory(memory);
     req.setResource(resource);
-    RegistrationResponse registrationResponse =
-        resourceTracker.registerNodeManager(req).getRegistrationResponse();
+    RegisterNodeManagerResponse registrationResponse =
+        resourceTracker.registerNodeManager(req);
     this.currentMasterKey = registrationResponse.getMasterKey();
     return registrationResponse;
   }
 
-  public HeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
+  public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
     return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
         isHealthy, ++responseId);
   }
 
-  public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
+  public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
       int containerId, ContainerState containerState) throws Exception {
     HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
         new HashMap<ApplicationId, List<ContainerStatus>>(1);
@@ -112,12 +112,12 @@ public class MockNM {
     return nodeHeartbeat(nodeUpdate, true);
   }
 
-  public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
+  public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
     return nodeHeartbeat(conts, isHealthy, ++responseId);
   }
 
-  public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
+  public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
@@ -133,8 +133,8 @@ public class MockNM {
     status.setNodeHealthStatus(healthStatus);
     req.setNodeStatus(status);
     req.setLastKnownMasterKey(this.currentMasterKey);
-    HeartbeatResponse heartbeatResponse =
-        resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
+    NodeHeartbeatResponse heartbeatResponse =
+        resourceTracker.nodeHeartbeat(req);
     MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey();
     this.currentMasterKey =
         (masterKeyFromRM != null

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java Mon Apr  1 16:47:16 2013
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -29,8 +31,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
 import com.google.common.collect.Lists;
 
@@ -184,9 +187,18 @@ public class MockNodes {
     }
 
     @Override
-    public HeartbeatResponse getLastHeartBeatResponse() {
+    public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
+    }
+
+    @Override
+    public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
       return null;
     }
+
+    @Override
+    public List<UpdatedContainerInfo> pullContainerUpdates() {
+      return new ArrayList<UpdatedContainerInfo>();
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Mon Apr  1 16:47:16 2013
@@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -96,8 +96,7 @@ public class NodeManager implements Cont
     request.setNodeId(this.nodeId);
     request.setResource(capability);
     request.setNodeId(this.nodeId);
-    resourceTrackerService.registerNodeManager(request)
-        .getRegistrationResponse();
+    resourceTrackerService.registerNodeManager(request);
     this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
         this.nodeId));
    
@@ -151,8 +150,8 @@ public class NodeManager implements Cont
     NodeHeartbeatRequest request = recordFactory
         .newRecordInstance(NodeHeartbeatRequest.class);
     request.setNodeStatus(nodeStatus);
-    HeartbeatResponse response = resourceTrackerService
-        .nodeHeartbeat(request).getHeartbeatResponse();
+    NodeHeartbeatResponse response = resourceTrackerService
+        .nodeHeartbeat(request);
     responseID = response.getResponseId();
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java Mon Apr  1 16:47:16 2013
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 
 public class Task {
@@ -57,8 +58,7 @@ public class Task {
     
     // Special case: Don't care about locality
     if (!(hosts.length == 1 && 
-        hosts[0].equals(
-            org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY))) {
+        hosts[0].equals(ResourceRequest.ANY))) {
       for (String host : hosts) {
         this.hosts.add(host);
         this.racks.add(Application.resolve(host));

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Mon Apr  1 16:47:16 2013
@@ -377,7 +377,61 @@ public class TestAppManager{
     ((Service)rmContext.getDispatcher()).stop();
   }
 
-  @Test
+  @Test (timeout = 30000)
+  public void testRMAppSubmitMaxAppAttempts() throws Exception {
+    int[] globalMaxAppAttempts = new int[] { 10, 1 };
+    int[][] individualMaxAppAttempts = new int[][]{
+        new int[]{ 9, 10, 11, 0 },
+        new int[]{ 1, 10, 0, -1 }};
+    int[][] expectedNums = new int[][]{
+        new int[]{ 9, 10, 10, 10 },
+        new int[]{ 1, 1, 1, 1 }};
+    for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
+      for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
+        long now = System.currentTimeMillis();
+
+        RMContext rmContext = mockRMContext(0, now - 10);
+        ResourceScheduler scheduler = new CapacityScheduler();
+        Configuration conf = new Configuration();
+        conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
+        ApplicationMasterService masterService =
+            new ApplicationMasterService(rmContext, scheduler);
+        TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
+            new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
+            new ApplicationACLsManager(conf), conf);
+
+        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+        ApplicationSubmissionContext context =
+            recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+        ContainerLaunchContext amContainer = recordFactory
+            .newRecordInstance(ContainerLaunchContext.class);
+        amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
+        context.setAMContainerSpec(amContainer);
+        setupDispatcher(rmContext, conf);
+
+        ApplicationId appID = MockApps.newAppID(1);
+        context.setApplicationId(appID);
+        if (individualMaxAppAttempts[i][j] != 0) {
+          context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
+        }
+        appMonitor.submitApplication(context);
+        RMApp app = rmContext.getRMApps().get(appID);
+        Assert.assertEquals("max application attempts doesn't match",
+            expectedNums[i][j], app.getMaxAppAttempts());
+
+        // wait for event to be processed
+        int timeoutSecs = 0;
+        while ((getAppEventType() == RMAppEventType.KILL) &&
+            timeoutSecs++ < 20) {
+          Thread.sleep(1000);
+        }
+        setAppEventType(RMAppEventType.KILL);
+        ((Service)rmContext.getDispatcher()).stop();
+      }
+    }
+  }
+
+  @Test (timeout = 3000)
   public void testRMAppSubmitWithQueueAndName() throws Exception {
     long now = System.currentTimeMillis();
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Apr  1 16:47:16 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -92,15 +92,15 @@ public class TestApplicationCleanup {
     Assert.assertEquals(request, contReceived);
     
     am.unregisterAppAttempt();
-    HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
+    NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
         ContainerState.COMPLETE);
     am.waitForState(RMAppAttemptState.FINISHED);
 
     //currently only containers are cleaned via this
     //AM container is cleaned via container launcher
     resp = nm1.nodeHeartbeat(true);
-    List<ContainerId> contsToClean = resp.getContainersToCleanupList();
-    List<ApplicationId> apps = resp.getApplicationsToCleanupList();
+    List<ContainerId> contsToClean = resp.getContainersToCleanup();
+    List<ApplicationId> apps = resp.getApplicationsToCleanup();
     int cleanedConts = contsToClean.size();
     int cleanedApps = apps.size();
     waitCount = 0;
@@ -109,8 +109,8 @@ public class TestApplicationCleanup {
           + cleanedConts + " cleanedApps: " + cleanedApps);
       Thread.sleep(100);
       resp = nm1.nodeHeartbeat(true);
-      contsToClean = resp.getContainersToCleanupList();
-      apps = resp.getApplicationsToCleanupList();
+      contsToClean = resp.getContainersToCleanup();
+      apps = resp.getApplicationsToCleanup();
       cleanedConts += contsToClean.size();
       cleanedApps += apps.size();
     }
@@ -198,9 +198,9 @@ public class TestApplicationCleanup {
       .getId(), ContainerState.RUNNING, "nothing", 0));
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
-    HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
+    NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
     dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanupList();
+    List<ContainerId> contsToClean = resp.getContainersToCleanup();
     int cleanedConts = contsToClean.size();
     waitCount = 0;
     while (cleanedConts < 1 && waitCount++ < 200) {
@@ -208,7 +208,7 @@ public class TestApplicationCleanup {
       Thread.sleep(100);
       resp = nm1.nodeHeartbeat(true);
       dispatcher.await();
-      contsToClean = resp.getContainersToCleanupList();
+      contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
     }
     LOG.info("Got cleanup for " + contsToClean.get(0));
@@ -226,7 +226,7 @@ public class TestApplicationCleanup {
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
     dispatcher.await();
-    contsToClean = resp.getContainersToCleanupList();
+    contsToClean = resp.getContainersToCleanup();
     cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
@@ -236,7 +236,7 @@ public class TestApplicationCleanup {
       Thread.sleep(100);
       resp = nm1.nodeHeartbeat(true);
       dispatcher.await();
-      contsToClean = resp.getContainersToCleanupList();
+      contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
     }
     LOG.info("Got cleanup for " + contsToClean.get(0));

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Mon Apr  1 16:47:16 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -60,6 +61,7 @@ public class TestApplicationMasterLaunch
     int nmPortAtContainerManager;
     int nmHttpPortAtContainerManager;
     long submitTimeAtContainerManager;
+    int maxAppAttempts;
 
     @Override
     public StartContainerResponse
@@ -82,7 +84,8 @@ public class TestApplicationMasterLaunch
           Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV));
       submitTimeAtContainerManager =
           Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
-
+      maxAppAttempts =
+          Integer.parseInt(env.get(ApplicationConstants.MAX_APP_ATTEMPTS_ENV));
       return null;
     }
 
@@ -139,6 +142,8 @@ public class TestApplicationMasterLaunch
         containerManager.nmPortAtContainerManager);
     Assert.assertEquals(nm1.getHttpPort(),
         containerManager.nmHttpPortAtContainerManager);
+    Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
+        containerManager.maxAppAttempts);
 
     MockAM am = new MockAM(rm.getRMContext(), rm
         .getApplicationMasterService(), appAttemptId);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Mon Apr  1 16:47:16 2013
@@ -19,7 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -176,6 +176,10 @@ public class TestClientRMService {
     List<ApplicationReport> applications = queueInfo.getQueueInfo()
         .getApplications();
     Assert.assertEquals(2, applications.size());
+    request.setQueueName("nonexistentqueue");
+    request.setIncludeApplications(true);
+    // should not throw exception on nonexistent queue
+    queueInfo = rmService.getQueueInfo(request);
   }
 
   private static final UserGroupInformation owner =
@@ -334,8 +338,10 @@ public class TestClientRMService {
     when(rmContext.getDispatcher()).thenReturn(dispatcher);
     QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
     queInfo.setQueueName("testqueue");
-    when(yarnScheduler.getQueueInfo(anyString(), anyBoolean(), anyBoolean()))
+    when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
         .thenReturn(queInfo);
+    when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))
+        .thenThrow(new IOException("queue does not exist"));
     ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
         yarnScheduler);
     when(rmContext.getRMApps()).thenReturn(apps);
@@ -368,8 +374,10 @@ public class TestClientRMService {
 
   private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
       ApplicationId applicationId3, YarnConfiguration config, String queueName) {
+    ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
+    when(asContext.getMaxAppAttempts()).thenReturn(1);
     return new RMAppImpl(applicationId3, rmContext, config, null, null,
-        queueName, null, yarnScheduler, null , System
+        queueName, asContext, yarnScheduler, null , System
             .currentTimeMillis());
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Apr  1 16:47:16 2013
@@ -26,7 +26,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -99,32 +99,32 @@ public class TestFifoScheduler {
 
     // add request for containers
     am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
-    AMResponse am1Response = am1.schedule(); // send the request
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
     // add request for containers
     am2.addRequests(new String[] { "h1", "h2" }, 3 * GB, 0, 1);
-    AMResponse am2Response = am2.schedule(); // send the request
+    AllocateResponse alloc2Response = am2.schedule(); // send the request
 
     // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
     nm1.nodeHeartbeat(true);
-    while (am1Response.getAllocatedContainers().size() < 1) {
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
       LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(1000);
-      am1Response = am1.schedule();
+      alloc1Response = am1.schedule();
     }
-    while (am2Response.getAllocatedContainers().size() < 1) {
+    while (alloc2Response.getAllocatedContainers().size() < 1) {
       LOG.info("Waiting for containers to be created for app 2...");
       Thread.sleep(1000);
-      am2Response = am2.schedule();
+      alloc2Response = am2.schedule();
     }
     // kick the scheduler, nothing given remaining 2 GB.
     nm2.nodeHeartbeat(true);
 
-    List<Container> allocated1 = am1Response.getAllocatedContainers();
+    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
     Assert.assertEquals(1, allocated1.size());
     Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
     Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
 
-    List<Container> allocated2 = am2Response.getAllocatedContainers();
+    List<Container> allocated2 = alloc2Response.getAllocatedContainers();
     Assert.assertEquals(1, allocated2.size());
     Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
     Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());
@@ -201,7 +201,7 @@ public class TestFifoScheduler {
     testMinimumAllocation(conf, allocMB / 2);
   }
 
-  @Test
+  @Test (timeout = 5000)
   public void testReconnectedNode() throws Exception {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     conf.setQueues("default", new String[] {"default"});
@@ -215,19 +215,19 @@ public class TestFifoScheduler {
     fs.handle(new NodeAddedSchedulerEvent(n1));
     fs.handle(new NodeAddedSchedulerEvent(n2));
     List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
     Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
 
     // reconnect n1 with downgraded memory
     n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
     fs.handle(new NodeRemovedSchedulerEvent(n1));
     fs.handle(new NodeAddedSchedulerEvent(n1));
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
 
     Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testHeadroom() throws Exception {
     
     Configuration conf = new Configuration();
@@ -264,18 +264,18 @@ public class TestFifoScheduler {
     
     // Ask for a 1 GB container for app 1
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
-    ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
-        BuilderUtils.newResource(GB, 1), 1));
+    ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
+        ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
     fs.allocate(appAttemptId1, ask1, emptyId);
 
     // Ask for a 2 GB container for app 2
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
-    ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
-        BuilderUtils.newResource(2 * GB, 1), 1));
+    ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
+        ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
     fs.allocate(appAttemptId2, ask2, emptyId);
     
     // Trigger container assignment
-    fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus));
+    fs.handle(new NodeUpdateSchedulerEvent(n1));
     
     // Get the allocation for the applications and verify headroom
     Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Mon Apr  1 16:47:16 2013
@@ -26,10 +26,12 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -135,6 +137,51 @@ public class TestRM {
     rm.stop();
   }
 
+  @Test (timeout = 30000)
+  public void testActivatingApplicationAfterAddingNM() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+
+    MockRM rm1 = new MockRM(conf);
+
+    // start like normal because state is empty
+    rm1.start();
+
+    // app that gets launched
+    RMApp app1 = rm1.submitApp(200);
+
+    // app that does not get launched
+    RMApp app2 = rm1.submitApp(200);
+
+    // app1 and app2 should be scheduled, but because no resource is available,
+    // they are not activated.
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
+    rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED);
+    RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
+    ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId();
+    rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED);
+
+    MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+    MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+
+    //kick the scheduling
+    nm1.nodeHeartbeat(true);
+
+    // app1 should be allocated now
+    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED);
+
+    nm2.nodeHeartbeat(true);
+
+    // app2 should be allocated now
+    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
+    rm1.waitForState(attemptId2, RMAppAttemptState.ALLOCATED);
+
+    rm1.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
     t.testGetNewAppId();

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Apr  1 16:47:16 2013
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnsw
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,6 +30,7 @@ import java.util.List;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -36,12 +38,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,7 +68,7 @@ public class TestRMNodeTransitions {
   private YarnScheduler scheduler;
 
   private SchedulerEventType eventType;
-  private List<ContainerStatus> completedContainers;
+  private List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
   
   private final class TestSchedulerEventDispatcher implements
   EventHandler<SchedulerEvent> {
@@ -89,10 +94,11 @@ public class TestRMNodeTransitions {
             final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
             eventType = event.getType();
             if (eventType == SchedulerEventType.NODE_UPDATE) {
-              completedContainers = 
-                  ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
-            } else {
-              completedContainers = null;
+              List<UpdatedContainerInfo> lastestContainersInfoList = 
+                  ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
+              for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
+            	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
+              }
             }
             return null;
           }
@@ -112,7 +118,7 @@ public class TestRMNodeTransitions {
   }
   
   private RMNodeStatusEvent getMockRMNodeStatusEvent() {
-    HeartbeatResponse response = mock(HeartbeatResponse.class);
+    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
 
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
     Boolean yes = new Boolean(true);
@@ -125,16 +131,16 @@ public class TestRMNodeTransitions {
     return event;
   }
   
-  @Test
+  @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
     node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
-		ContainerId completedContainerId = BuilderUtils.newContainerId(
-				BuilderUtils.newApplicationAttemptId(
-						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
     node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     
@@ -146,9 +152,110 @@ public class TestRMNodeTransitions {
     doReturn(Collections.singletonList(containerStatus)).
         when(statusEvent).getContainers();
     node.handle(statusEvent);
-    Assert.assertEquals(0, completedContainers.size());
+    /* Expect the scheduler call handle function 2 times
+     * 1. RMNode status from new to Running, handle the add_node event
+     * 2. handle the node update event
+     */
+    verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));     
   }
-
+  
+  @Test (timeout = 5000)
+  public void testContainerUpdate() throws InterruptedException{
+    //Start the node
+    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    
+    NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
+    RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
+    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    
+    ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 2);
+ 
+    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
+    
+    ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
+
+    doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode1))
+        .when(statusEventFromNode1).getContainers();
+    node.handle(statusEventFromNode1);
+    Assert.assertEquals(1, completedContainers.size());
+    Assert.assertEquals(completedContainerIdFromNode1,
+        completedContainers.get(0).getContainerId());
+
+    completedContainers.clear();
+
+    doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode2_1))
+        .when(statusEventFromNode2_1).getContainers();
+
+    doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
+        .getContainerId();
+    doReturn(Collections.singletonList(containerStatusFromNode2_2))
+        .when(statusEventFromNode2_2).getContainers();
+
+    node2.setNextHeartBeat(false);
+    node2.handle(statusEventFromNode2_1);
+    node2.setNextHeartBeat(true);
+    node2.handle(statusEventFromNode2_2);
+
+    Assert.assertEquals(2, completedContainers.size());
+    Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
+        .getContainerId()); 
+    Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
+        .getContainerId());   
+  }
+  
+  @Test (timeout = 5000)
+  public void testStatusChange(){
+    //Start the node
+    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    //Add info to the queue first
+    node.setNextHeartBeat(false);
+
+    ContainerId completedContainerId1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(0, 0), 0), 0);
+    ContainerId completedContainerId2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+        
+    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+
+    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+    ContainerStatus containerStatus2 = mock(ContainerStatus.class);
+
+    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
+    doReturn(Collections.singletonList(containerStatus1))
+        .when(statusEvent1).getContainers();
+     
+    doReturn(completedContainerId2).when(containerStatus2).getContainerId();
+    doReturn(Collections.singletonList(containerStatus2))
+        .when(statusEvent2).getContainers();
+
+    verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); 
+    node.handle(statusEvent1);
+    node.handle(statusEvent2);
+    verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
+    Assert.assertEquals(2, node.getQueueSize());
+    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
+    Assert.assertEquals(0, node.getQueueSize());
+  }
+  
   @Test
   public void testRunningExpire() {
     RMNodeImpl node = getRunningNode();
@@ -195,6 +302,39 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.REBOOTED, node.getState());
   }
 
+  @Test(timeout=20000)
+  public void testUpdateHeartbeatResponseForCleanup() {
+    RMNodeImpl node = getRunningNode();
+    NodeId nodeId = node.getNodeID();
+
+    // Expire a container
+		ContainerId completedContainerId = BuilderUtils.newContainerId(
+				BuilderUtils.newApplicationAttemptId(
+						BuilderUtils.newApplicationId(0, 0), 0), 0);
+    node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
+    Assert.assertEquals(1, node.getContainersToCleanUp().size());
+
+    // Finish an application
+    ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
+    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+
+    // Verify status update does not clear containers/apps to cleanup
+    // but updating heartbeat response for cleanup does
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    node.handle(statusEvent);
+    Assert.assertEquals(1, node.getContainersToCleanUp().size());
+    Assert.assertEquals(1, node.getAppsToCleanup().size());
+    NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
+    node.updateNodeHeartbeatResponseForCleanup(hbrsp);
+    Assert.assertEquals(0, node.getContainersToCleanUp().size());
+    Assert.assertEquals(0, node.getAppsToCleanup().size());
+    Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
+    Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
+    Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
+    Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
+  }
+
   private RMNodeImpl getRunningNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Apr  1 16:47:16 2013
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
@@ -213,12 +213,13 @@ public class TestRMRestart {
     // verify old AM is not accepted
     // change running AM to talk to new RM
     am1.setAMRMProtocol(rm2.getApplicationMasterService());
-    AMResponse amResponse = am1.allocate(new ArrayList<ResourceRequest>(),
+    AllocateResponse allocResponse = am1.allocate(
+        new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    Assert.assertTrue(amResponse.getReboot());
+    Assert.assertTrue(allocResponse.getReboot());
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
-    HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
     Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
     hbResponse = nm2.nodeHeartbeat(true);
     Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Mon Apr  1 16:47:16 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -27,9 +28,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -122,7 +125,7 @@ public class TestResourceManager {
     Task t2 = new Task(application, priority1, new String[] {host1, host2});
     application.addTask(t2);
 
-    Task t3 = new Task(application, priority0, new String[] {RMNode.ANY});
+    Task t3 = new Task(application, priority0, new String[] {ResourceRequest.ANY});
     application.addTask(t3);
 
     // Send resource requests to the scheduler
@@ -183,4 +186,16 @@ public class TestResourceManager {
     }
   }
 
+  @Test (timeout = 30000)
+  public void testResourceManagerInitConfigValidation() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
+    try {
+      resourceManager.init(conf);
+      fail("Exception is expected because the global max attempts is negative.");
+    } catch (YarnException e) {
+      // Exception is expected.
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Mon Apr  1 16:47:16 2013
@@ -36,9 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -75,7 +75,7 @@ public class TestResourceTrackerService 
     assert(metrics != null);
     int metricCount = metrics.getNumDecommisionedNMs();
 
-    HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     nodeHeartbeat = nm2.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@@ -124,7 +124,7 @@ public class TestResourceTrackerService 
 
     int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
 
-    HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     nodeHeartbeat = nm2.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@@ -161,7 +161,7 @@ public class TestResourceTrackerService 
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
     assert(metrics != null);
     int initialMetricCount = metrics.getNumDecommisionedNMs();
-    HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertEquals(
         NodeAction.NORMAL,
         nodeHeartbeat.getNodeAction());
@@ -198,7 +198,7 @@ public class TestResourceTrackerService 
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
     assert(metrics != null);
     int initialMetricCount = metrics.getNumDecommisionedNMs();
-    HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertEquals(
         NodeAction.NORMAL,
         nodeHeartbeat.getNodeAction());
@@ -241,7 +241,7 @@ public class TestResourceTrackerService 
     req.setHttpPort(1234);
     // trying to register a invalid node.
     RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
-    Assert.assertEquals(NodeAction.SHUTDOWN,response.getRegistrationResponse().getNodeAction());
+    Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
   }
 
   @Test
@@ -254,7 +254,7 @@ public class TestResourceTrackerService 
     MockNM nm2 = rm.registerNode("host2:1234", 2048);
 
     int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
-    HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
 
     nodeHeartbeat = nm2.nodeHeartbeat(
@@ -351,7 +351,7 @@ public class TestResourceTrackerService 
 
     // reconnect of healthy node
     nm1 = rm.registerNode("host1:1234", 5120);
-    HeartbeatResponse response = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     dispatcher.await();
     Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());



Mime
View raw message