hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1565792 [3/5] - in /hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...
Date Fri, 07 Feb 2014 20:33:02 GMT
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Feb  7 20:33:01 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,19 +58,15 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -80,10 +75,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -121,10 +114,10 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
-public class FairScheduler extends AbstractYarnScheduler implements
-    ResourceScheduler {
+public class FairScheduler implements ResourceScheduler {
   private boolean initialized;
   private FairSchedulerConfiguration conf;
+  private RMContext rmContext;
   private Resource minimumAllocation;
   private Resource maximumAllocation;
   private Resource incrAllocation;
@@ -158,6 +151,12 @@ public class FairScheduler extends Abstr
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
+  // This stores per-application scheduling information, indexed by
+  // attempt ID's for fast lookup.
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
+      new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
+
   // Nodes in the cluster, indexed by NodeId
   private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
@@ -252,21 +251,10 @@ public class FairScheduler extends Abstr
     return queueMgr;
   }
 
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  private FSSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+  private RMContainer getRMContainer(ContainerId containerId) {
+    FSSchedulerApp application = 
+        applications.get(containerId.getApplicationAttemptId());
+    return (application == null) ? null : application.getRMContainer(containerId);
   }
 
   /**
@@ -603,87 +591,64 @@ public class FairScheduler extends Abstr
    * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+  protected synchronized void addApplicationAttempt(
+      ApplicationAttemptId applicationAttemptId, String queueName, String user) {
     if (queueName == null || queueName.isEmpty()) {
-      String message = "Reject application " + applicationId +
+      String message = "Reject application " + applicationAttemptId +
               " submitted by user " + user + " with an empty queue name.";
       LOG.info(message);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, message));
+      rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppAttemptRejectedEvent(applicationAttemptId, message));
       return;
     }
 
-    RMApp rmApp = rmContext.getRMApps().get(applicationId);
+    RMApp rmApp = rmContext.getRMApps().get(
+        applicationAttemptId.getApplicationId());
     FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
     if (queue == null) {
       rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppRejectedEvent(applicationId,
+          new RMAppAttemptRejectedEvent(applicationAttemptId,
               "Application rejected by queue placement policy"));
       return;
     }
 
+    FSSchedulerApp schedulerApp =
+        new FSSchedulerApp(applicationAttemptId, user,
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
+            rmContext);
+
     // Enforce ACLs
     UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
 
     if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
         && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
       String msg = "User " + userUgi.getUserName() +
-              " cannot submit applications to queue " + queue.getName();
+    	        " cannot submit applications to queue " + queue.getName();
       LOG.info(msg);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppRejectedEvent(applicationId, msg));
+      rmContext.getDispatcher().getEventHandler().handle(
+    	        new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
       return;
     }
-  
-    SchedulerApplication application =
-        new SchedulerApplication(queue, user);
-    applications.put(applicationId, application);
-    queue.getMetrics().submitApp(user);
-
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queueName + ", currently num of applications: "
-        + applications.size());
-    rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
-  }
-
-  /**
-   * Add a new application attempt to the scheduler.
-   */
-  protected synchronized void addApplicationAttempt(
-      ApplicationAttemptId applicationAttemptId,
-      boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    String user = application.getUser();
-    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
-
-    FSSchedulerApp attempt =
-        new FSSchedulerApp(applicationAttemptId, user,
-            queue, new ActiveUsersManager(getRootQueueMetrics()),
-            rmContext);
-    if (transferStateFromPreviousAttempt) {
-      attempt.transferStateFromPreviousAttempt(application
-        .getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(attempt);
 
     boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
-    queue.addApp(attempt, runnable);
+    queue.addApp(schedulerApp, runnable);
     if (runnable) {
-      maxRunningEnforcer.trackRunnableApp(attempt);
+      maxRunningEnforcer.trackRunnableApp(schedulerApp);
     } else {
-      maxRunningEnforcer.trackNonRunnableApp(attempt);
+      maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
     }
     
-    queue.getMetrics().submitAppAttempt(user);
+    queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+
+    applications.put(applicationAttemptId, schedulerApp);
+
+    LOG.info("Application Submission: " + applicationAttemptId +
+        ", user: "+ user +
+        ", currently active: " + applications.size());
 
-    LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user: " + user);
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+            RMAppAttemptEventType.APP_ACCEPTED));
   }
   
   @VisibleForTesting
@@ -709,40 +674,21 @@ public class FairScheduler extends Abstr
     return queue;
   }
 
-  private synchronized void removeApplication(ApplicationId applicationId,
-      RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
-    if (application == null){
-      LOG.warn("Couldn't find application " + applicationId);
-      return;
-    }
-    application.stop(finalState);
-    applications.remove(applicationId);
-  }
-
   private synchronized void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
+      RMAppAttemptState rmAppAttemptFinalState) {
     LOG.info("Application " + applicationAttemptId + " is done." +
         " finalState=" + rmAppAttemptFinalState);
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
 
-    if (attempt == null || application == null) {
+    FSSchedulerApp application = applications.get(applicationAttemptId);
+
+    if (application == null) {
       LOG.info("Unknown application " + applicationAttemptId + " has completed!");
       return;
     }
 
     // Release all the running containers
-    for (RMContainer rmContainer : attempt.getLiveContainers()) {
-      if (keepContainers
-          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + rmContainer.getContainerId());
-        continue;
-      }
+    for (RMContainer rmContainer : application.getLiveContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
@@ -751,26 +697,30 @@ public class FairScheduler extends Abstr
     }
 
     // Release all reserved containers
-    for (RMContainer rmContainer : attempt.getReservedContainers()) {
+    for (RMContainer rmContainer : application.getReservedContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
               "Application Complete"),
-              RMContainerEventType.KILL);
+          RMContainerEventType.KILL);
     }
+
     // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
+    application.stop(rmAppAttemptFinalState);
 
     // Inform the queue
-    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
+    FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
         .getQueueName(), false);
-    boolean wasRunnable = queue.removeApp(attempt);
+    boolean wasRunnable = queue.removeApp(application);
 
     if (wasRunnable) {
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
+      maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
     } else {
-      maxRunningEnforcer.untrackNonRunnableApp(attempt);
+      maxRunningEnforcer.untrackNonRunnableApp(application);
     }
+    
+    // Remove from our data-structure
+    applications.remove(applicationAttemptId);
   }
 
   /**
@@ -786,13 +736,11 @@ public class FairScheduler extends Abstr
     Container container = rmContainer.getContainer();
 
     // Get the application for the finished container
-    FSSchedulerApp application =
-        getCurrentAttemptForContainer(container.getId());
-    ApplicationId appId =
-        container.getId().getApplicationAttemptId().getApplicationId();
+    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+    FSSchedulerApp application = applications.get(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
-          " unknown application attempt " + appId +
+          " unknown application " + applicationAttemptId +
           " completed with event " + event);
       return;
     }
@@ -809,9 +757,10 @@ public class FairScheduler extends Abstr
       updateRootQueueMetrics();
     }
 
-    LOG.info("Application attempt " + application.getApplicationAttemptId()
-        + " released container " + container.getId() + " on node: " + node
-        + " with event: " + event);
+    LOG.info("Application " + applicationAttemptId +
+        " released container " + container.getId() +
+        " on node: " + node +
+        " with event: " + event);
   }
 
   private synchronized void addNode(RMNode node) {
@@ -862,7 +811,7 @@ public class FairScheduler extends Abstr
       List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
 
     // Make sure this application exists
-    FSSchedulerApp application = getSchedulerApp(appAttemptId);
+    FSSchedulerApp application = applications.get(appAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + appAttemptId);
@@ -932,11 +881,12 @@ public class FairScheduler extends Abstr
    */
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
+    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+    FSSchedulerApp application = applications.get(applicationAttemptId);
     if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
+      LOG.info("Unknown application: " + applicationAttemptId +
+          " launched container " + containerId +
+          " on node: " + node);
       return;
     }
 
@@ -1075,34 +1025,28 @@ public class FairScheduler extends Abstr
   }
   
   public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
-    SchedulerApplication app =
-        applications.get(appAttemptId.getApplicationId());
-    if (app != null) {
-      return (FSSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+    return applications.get(appAttemptId);
   }
   
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
+    if (!applications.containsKey(appAttemptId)) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(attempt);
+    return new SchedulerAppReport(applications.get(appAttemptId));
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId appAttemptId) {
-    FSSchedulerApp attempt = getSchedulerApp(appAttemptId);
-    if (attempt == null) {
+    FSSchedulerApp app = applications.get(appAttemptId);
+    if (app == null) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return attempt.getResourceUsageReport();
+    return app.getResourceUsageReport();
   }
   
   /**
@@ -1146,30 +1090,15 @@ public class FairScheduler extends Abstr
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
       nodeUpdate(nodeUpdatedEvent.getRMNode());
       break;
-    case APP_ADDED:
-      if (!(event instanceof AppAddedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
-      break;
-    case APP_REMOVED:
-      if (!(event instanceof AppRemovedSchedulerEvent)) {
-        throw new RuntimeException("Unexpected event type: " + event);
-      }
-      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      removeApplication(appRemovedEvent.getApplicationID(),
-        appRemovedEvent.getFinalState());
-      break;
     case APP_ATTEMPT_ADDED:
       if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
+      String queue = appAttemptAddedEvent.getQueue();
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        queue, appAttemptAddedEvent.getUser());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
@@ -1177,10 +1106,8 @@ public class FairScheduler extends Abstr
       }
       AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
           (AppAttemptRemovedSchedulerEvent) event;
-      removeApplicationAttempt(
-          appAttemptRemovedEvent.getApplicationAttemptID(),
-          appAttemptRemovedEvent.getFinalAttemptState(),
-          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+      removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+        appAttemptRemovedEvent.getFinalAttemptState());
       break;
     case CONTAINER_EXPIRED:
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {
@@ -1231,9 +1158,6 @@ public class FairScheduler extends Abstr
       
       rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
-      // This stores per-application scheduling information
-      this.applications =
-          new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1356,4 +1280,5 @@ public class FairScheduler extends Abstr
     queue.collectSchedulerApplications(apps);
     return apps;
   }
+
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Feb  7 20:33:01 2014
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,19 +58,14 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -80,15 +74,12 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -105,8 +96,7 @@ import com.google.common.annotations.Vis
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
-    ResourceScheduler, Configurable {
+public class FifoScheduler implements ResourceScheduler, Configurable {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
 
@@ -117,6 +107,7 @@ public class FifoScheduler extends Abstr
 
   private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {};
   private final static List<Container> EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY);
+  private RMContext rmContext;
 
   protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
 
@@ -125,6 +116,11 @@ public class FifoScheduler extends Abstr
   private Resource maximumAllocation;
   private boolean usePortForNodeName;
 
+  // Use ConcurrentSkipListMap because applications need to be ordered
+  @VisibleForTesting
+  protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+      = new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
+  
   private ActiveUsersManager activeUsersManager;
 
   private static final String DEFAULT_QUEUE_NAME = "default";
@@ -239,9 +235,6 @@ public class FifoScheduler extends Abstr
     if (!this.initialized) {
       validateConf(conf);
       this.rmContext = rmContext;
-      //Use ConcurrentSkipListMap because applications need to be ordered
-      this.applications =
-          new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
       this.minimumAllocation = 
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
@@ -266,7 +259,7 @@ public class FifoScheduler extends Abstr
   public Allocation allocate(
       ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
       List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
-    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -332,114 +325,74 @@ public class FifoScheduler extends Abstr
   }
 
   @VisibleForTesting
-  FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApplication app =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+  FiCaSchedulerApp getApplication(
+      ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
   }
 
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : new SchedulerAppReport(app);
   }
   
   @Override
   public ApplicationResourceUsageReport getAppResourceUsageReport(
       ApplicationAttemptId applicationAttemptId) {
-    FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+    FiCaSchedulerApp app = getApplication(applicationAttemptId);
     return app == null ? null : app.getResourceUsageReport();
   }
   
   private FiCaSchedulerNode getNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }
-
-  private synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
-    SchedulerApplication application =
-        new SchedulerApplication(DEFAULT_QUEUE, user);
-    applications.put(applicationId, application);
-    metrics.submitApp(user);
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
-  }
-
-  private synchronized void
-      addApplicationAttempt(ApplicationAttemptId appAttemptId,
-          boolean transferStateFromPreviousAttempt) {
-    SchedulerApplication application =
-        applications.get(appAttemptId.getApplicationId());
-    String user = application.getUser();
+  
+  private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId,
+      String user) {
     // TODO: Fix store
-    FiCaSchedulerApp schedulerApp =
-        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
-          activeUsersManager, this.rmContext);
-
-    if (transferStateFromPreviousAttempt) {
-      schedulerApp.transferStateFromPreviousAttempt(application
-        .getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(schedulerApp);
-
-    metrics.submitAppAttempt(user);
-    LOG.info("Added Application Attempt " + appAttemptId
-        + " to scheduler from user " + application.getUser());
+    FiCaSchedulerApp schedulerApp = 
+        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
+            this.rmContext);
+    applications.put(appAttemptId, schedulerApp);
+    metrics.submitApp(user, appAttemptId.getAttemptId());
+    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
+        " from " + user + ", currently active: " + applications.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
-  }
-
-  private synchronized void doneApplication(ApplicationId applicationId,
-      RMAppState finalState) {
-    SchedulerApplication application = applications.get(applicationId);
-    if (application == null){
-      LOG.warn("Couldn't find application " + applicationId);
-      return;
-    }
-
-    // Inform the activeUsersManager
-    activeUsersManager.deactivateApplication(application.getUser(),
-      applicationId);
-    application.stop(finalState);
-    applications.remove(applicationId);
+            RMAppAttemptEventType.APP_ACCEPTED));
   }
 
   private synchronized void doneApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
-      RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
+      RMAppAttemptState rmAppAttemptFinalState)
       throws IOException {
-    FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
-    SchedulerApplication application =
-        applications.get(applicationAttemptId.getApplicationId());
-    if (application == null || attempt == null) {
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
       throw new IOException("Unknown application " + applicationAttemptId + 
       " has completed!");
     }
 
     // Kill all 'live' containers
-    for (RMContainer container : attempt.getLiveContainers()) {
-      if (keepContainers
-          && container.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + container.getContainerId());
-        continue;
-      }
-      containerCompleted(container,
-        SchedulerUtils.createAbnormalContainerStatus(
-          container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
-        RMContainerEventType.KILL);
+    for (RMContainer container : application.getLiveContainers()) {
+      containerCompleted(container, 
+          SchedulerUtils.createAbnormalContainerStatus(
+              container.getContainerId(), 
+              SchedulerUtils.COMPLETED_APPLICATION),
+          RMContainerEventType.KILL);
+    }
+
+    // Inform the activeUsersManager
+    synchronized (application) {
+      activeUsersManager.deactivateApplication(
+          application.getUser(), application.getApplicationId());
     }
 
     // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
+    application.stop(rmAppAttemptFinalState);
+
+    // Remove the application
+    applications.remove(applicationAttemptId);
   }
   
   /**
@@ -453,10 +406,9 @@ public class FifoScheduler extends Abstr
         " #applications=" + applications.size());
 
     // Try to assign containers to applications in fifo order
-    for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
+    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
         .entrySet()) {
-      FiCaSchedulerApp application =
-          (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+      FiCaSchedulerApp application = e.getValue();
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -493,10 +445,8 @@ public class FifoScheduler extends Abstr
 
     // Update the applications' headroom to correctly take into
     // account the containers assigned in this update.
-    for (SchedulerApplication application : applications.values()) {
-      FiCaSchedulerApp attempt =
-          (FiCaSchedulerApp) application.getCurrentAppAttempt();
-      attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+    for (FiCaSchedulerApp application : applications.values()) {
+      application.setHeadroom(Resources.subtract(clusterResource, usedResource));
     }
   }
 
@@ -747,26 +697,12 @@ public class FifoScheduler extends Abstr
       nodeUpdate(nodeUpdatedEvent.getRMNode());
     }
     break;
-    case APP_ADDED:
-    {
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
-    }
-    break;
-    case APP_REMOVED:
-    {
-      AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      doneApplication(appRemovedEvent.getApplicationID(),
-        appRemovedEvent.getFinalState());
-    }
-    break;
     case APP_ATTEMPT_ADDED:
     {
       AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
-        appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+        appAttemptAddedEvent.getUser());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -776,8 +712,7 @@ public class FifoScheduler extends Abstr
       try {
         doneApplicationAttempt(
           appAttemptRemovedEvent.getApplicationAttemptID(),
-          appAttemptRemovedEvent.getFinalAttemptState(),
-          appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+          appAttemptRemovedEvent.getFinalAttemptState());
       } catch(IOException ie) {
         LOG.error("Unable to remove application "
             + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
@@ -803,11 +738,12 @@ public class FifoScheduler extends Abstr
 
   private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
     // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
+    ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + containerId +
+          " on node: " + node);
       // Some unknown container sneaked into the system. Kill it.
       this.rmContext.getDispatcher().getEventHandler()
         .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
@@ -828,16 +764,14 @@ public class FifoScheduler extends Abstr
 
     // Get the application for the finished container
     Container container = rmContainer.getContainer();
-    FiCaSchedulerApp application =
-        getCurrentAttemptForContainer(container.getId());
-    ApplicationId appId =
-        container.getId().getApplicationAttemptId().getApplicationId();
+    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+    FiCaSchedulerApp application = getApplication(applicationAttemptId);
     
     // Get the node on which the container was allocated
     FiCaSchedulerNode node = getNode(container.getNodeId());
     
     if (application == null) {
-      LOG.info("Unknown application: " + appId + 
+      LOG.info("Unknown application: " + applicationAttemptId + 
           " released container " + container.getId() +
           " on node: " + node + 
           " with event: " + event);
@@ -853,7 +787,7 @@ public class FifoScheduler extends Abstr
     // Update total usage
     Resources.subtractFrom(usedResource, container.getResource());
 
-    LOG.info("Application attempt " + application.getApplicationAttemptId() + 
+    LOG.info("Application " + applicationAttemptId + 
         " released container " + container.getId() +
         " on node: " + node + 
         " with event: " + event);
@@ -911,22 +845,11 @@ public class FifoScheduler extends Abstr
     FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
-
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
-    return (attempt == null) ? null : attempt.getRMContainer(containerId);
-  }
-
-  private FiCaSchedulerApp getCurrentAttemptForContainer(
-      ContainerId containerId) {
-    SchedulerApplication app =
-        applications.get(containerId.getApplicationAttemptId()
-          .getApplicationId());
-    if (app != null) {
-      return (FiCaSchedulerApp) app.getCurrentAppAttempt();
-    }
-    return null;
+  
+  private RMContainer getRMContainer(ContainerId containerId) {
+    FiCaSchedulerApp application = 
+        getApplication(containerId.getApplicationAttemptId());
+    return (application == null) ? null : application.getRMContainer(containerId);
   }
 
   @Override
@@ -943,12 +866,12 @@ public class FifoScheduler extends Abstr
   @Override
   public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
     if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
-      List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
+      List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
           applications.size());
-      for (SchedulerApplication app : applications.values()) {
-        attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
+      for (FiCaSchedulerApp app : applications.values()) {
+        apps.add(app.getApplicationAttemptId());
       }
-      return attempts;
+      return apps;
     } else {
       return null;
     }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Fri Feb  7 20:33:01 2014
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -165,14 +164,11 @@ public class Application {
     final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
     
     resourceManager.getClientRMService().submitApplication(request);
-
+    
     // Notify scheduler
-    AppAddedSchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
-    scheduler.handle(addAppEvent);
-    AppAttemptAddedSchedulerEvent addAttemptEvent =
-        new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
-    scheduler.handle(addAttemptEvent);
+    AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent(
+            this.applicationAttemptId, this.queue, this.user);
+    scheduler.handle(appAddedEvent1);
   }
   
   public synchronized void addResourceRequestSpec(

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Fri Feb  7 20:33:01 2014
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
-import org.mortbay.log.Log;
 
 public class MockNM {
 
@@ -131,13 +130,12 @@ public class MockNM {
       int containerId, ContainerState containerState) throws Exception {
     HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
         new HashMap<ApplicationId, List<ContainerStatus>>(1);
-    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
-        BuilderUtils.newContainerId(attemptId, containerId), containerState,
-        "Success", 0);
+    ContainerStatus amContainerStatus = BuilderUtils.newContainerStatus(
+        BuilderUtils.newContainerId(attemptId, 1),
+        ContainerState.COMPLETE, "Success", 0);
     ArrayList<ContainerStatus> containerStatusList =
         new ArrayList<ContainerStatus>(1);
-    containerStatusList.add(containerStatus);
-    Log.info("ContainerStatus: " + containerStatus);
+    containerStatusList.add(amContainerStatus);
     nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
     return nodeHeartbeat(nodeUpdate, true);
   }
@@ -154,7 +152,6 @@ public class MockNM {
     status.setResponseId(resId);
     status.setNodeId(nodeId);
     for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
-      Log.info("entry.getValue() " + entry.getValue());
       status.setContainersStatuses(entry.getValue());
     }
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Fri Feb  7 20:33:01 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@@ -41,10 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -60,8 +56,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -128,33 +122,6 @@ public class MockRM extends ResourceMana
         attempt.getAppAttemptState());
   }
 
-  public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
-      throws Exception {
-    int timeoutSecs = 0;
-    while (getResourceScheduler().getRMContainer(containerId) == null
-        && timeoutSecs++ < 40) {
-      System.out.println("Waiting for" + containerId + " to be allocated.");
-      nm.nodeHeartbeat(true);
-      Thread.sleep(200);
-    }
-  }
-
-  public void waitForState(MockNM nm, ContainerId containerId,
-      RMContainerState containerState) throws Exception {
-    RMContainer container = getResourceScheduler().getRMContainer(containerId);
-    Assert.assertNotNull("Container shouldn't be null", container);
-    int timeoutSecs = 0;
-    while (!containerState.equals(container.getState()) && timeoutSecs++ < 40) {
-      System.out.println("Container : " + containerId + " State is : "
-          + container.getState() + " Waiting for state : " + containerState);
-      nm.nodeHeartbeat(true);
-      Thread.sleep(300);
-    }
-    System.out.println("Container State is : " + container.getState());
-    Assert.assertEquals("Container state is not correct (timedout)",
-      containerState, container.getState());
-  }
-
   // get new application id
   public GetNewApplicationResponse getNewAppId() throws Exception {
     ApplicationClientProtocol client = getClientRMService();
@@ -205,17 +172,7 @@ public class MockRM extends ResourceMana
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
-      boolean waitForAccepted)
-      throws Exception {
-    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
-      maxAppAttempts, ts, appType, waitForAccepted, false);
-  }
-
-  public RMApp submitApp(int masterMemory, String name, String user,
-      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
-      int maxAppAttempts, Credentials ts, String appType,
-      boolean waitForAccepted, boolean keepContainers)
-      throws Exception {
+      boolean waitForAccepted) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -225,7 +182,6 @@ public class MockRM extends ResourceMana
         .newRecord(SubmitApplicationRequest.class);
     ApplicationSubmissionContext sub = Records
         .newRecord(ApplicationSubmissionContext.class);
-    sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
     sub.setApplicationId(appId);
     sub.setApplicationName(name);
     sub.setMaxAppAttempts(maxAppAttempts);
@@ -465,33 +421,4 @@ public class MockRM extends ResourceMana
     // override to disable webapp
   }
 
-  public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
-      MockAM am) throws Exception {
-    FinishApplicationMasterRequest req =
-        FinishApplicationMasterRequest.newInstance(
-          FinalApplicationStatus.SUCCEEDED, "", "");
-    am.unregisterAppAttempt(req);
-    am.waitForState(RMAppAttemptState.FINISHING);
-    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
-    am.waitForState(RMAppAttemptState.FINISHED);
-    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
-  }
-
-  public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
-      throws Exception {
-    RMAppAttempt attempt = app.getCurrentAppAttempt();
-    nm.nodeHeartbeat(true);
-    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
-    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
-    return am;
-  }
-
-  public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
-      throws Exception {
-    MockAM am = launchAM(app, rm, nm);
-    am.registerAppAttempt();
-    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
-    return am;
-  }
-
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Fri Feb  7 20:33:01 2014
@@ -649,7 +649,7 @@ public class TestClientRMService {
             .currentTimeMillis(), "YARN"));
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1);
     RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
-        rmContext, yarnScheduler, null, asContext, config, false);
+        rmContext, yarnScheduler, null, asContext, config, null);
     when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
     return app;
   }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Fri Feb  7 20:33:01 2014
@@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -298,12 +297,9 @@ public class TestFifoScheduler {
     ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
         appId1, 1);
-    SchedulerEvent appEvent =
-        new AppAddedSchedulerEvent(appId1, "queue", "user");
-    fs.handle(appEvent);
-    SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
-    fs.handle(attemptEvent);
+    SchedulerEvent event1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
+    fs.handle(event1);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
     List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
@@ -392,22 +388,16 @@ public class TestFifoScheduler {
     ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
     ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
         appId1, 1);
-    SchedulerEvent appEvent =
-        new AppAddedSchedulerEvent(appId1, "queue", "user");
-    fs.handle(appEvent);
-    SchedulerEvent attemptEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
-    fs.handle(attemptEvent);
+    SchedulerEvent event1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user");
+    fs.handle(event1);
 
     ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
     ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
         appId2, 1);
-    SchedulerEvent appEvent2 =
-        new AppAddedSchedulerEvent(appId2, "queue", "user");
-    fs.handle(appEvent2);
-    SchedulerEvent attemptEvent2 =
-        new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
-    fs.handle(attemptEvent2);
+    SchedulerEvent event2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user");
+    fs.handle(event2);
 
     List<ContainerId> emptyId = new ArrayList<ContainerId>();
     List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Fri Feb  7 20:33:01 2014
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.spy;
-
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -33,33 +29,26 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -67,9 +56,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
 
-@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRM {
 
   private static final Log LOG = LogFactory.getLog(TestRM.class);
@@ -314,8 +301,6 @@ public class TestRM {
         nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
             ContainerState.COMPLETE);
       }
-      nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1,
-        ContainerState.COMPLETE);
       am.waitForState(RMAppAttemptState.FINISHED);
       Assert.assertFalse(nmTokenSecretManager
           .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@@ -412,19 +397,19 @@ public class TestRM {
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
     nm1.registerNode();
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
+    MockAM am1 = launchAM(app1, rm1, nm1);
+    finishApplicationMaster(app1, rm1, nm1, am1);
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    MockAM am2 = launchAM(app2, rm1, nm1);
     nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
     rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
 
     // a killed app
     RMApp app3 = rm1.submitApp(200);
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+    MockAM am3 = launchAM(app3, rm1, nm1);
     rm1.killApp(app3.getApplicationId());
     rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
     rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
@@ -464,7 +449,7 @@ public class TestRM {
 
     // a failed app
     RMApp app2 = rm1.submitApp(200);
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+    MockAM am2 = launchAM(app2, rm1, nm1);
     nm1
       .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FAILED);
@@ -481,77 +466,26 @@ public class TestRM {
     Assert.assertEquals(-1, report1.getRpcPort());
   }
 
-  /**
-   * Validate killing an application when it is at accepted state.
-   * @throws Exception exception
-   */
-  @Test (timeout = 60000)
-  public void testApplicationKillAtAcceptedState() throws Exception {
-
-    YarnConfiguration conf = new YarnConfiguration();
-    final Dispatcher dispatcher = new AsyncDispatcher() {
-      @Override
-      public EventHandler getEventHandler() {
-
-        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
-          @Override
-          public boolean matches(Object argument) {
-            if (argument instanceof RMAppAttemptEvent) {
-              if (((RMAppAttemptEvent) argument).getType().equals(
-                RMAppAttemptEventType.KILL)) {
-                return true;
-              }
-            }
-            return false;
-          }
-        }
-
-        EventHandler handler = spy(super.getEventHandler());
-        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
-        return handler;
-      }
-    };
-
-    MockRM rm = new MockRM(conf) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
-
-    rm.start();
-    MockNM nm1 =
-        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
-    nm1.registerNode();
+  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+      throws Exception {
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    return am;
+  }
 
-    // a failed app
-    RMApp application = rm.submitApp(200);
-    MockAM am = MockRM.launchAM(application, rm, nm1);
-    am.waitForState(RMAppAttemptState.LAUNCHED);
-    nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
-    rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
-
-    // Now kill the application before new attempt is launched, the app report
-    // returns the invalid AM host and port.
-    KillApplicationRequest request =
-        KillApplicationRequest.newInstance(application.getApplicationId());
-    rm.getClientRMService().forceKillApplication(request);
-
-    // Specific test for YARN-1689 follows
-    // Now let's say a race causes AM to register now. This should not crash RM.
-    am.registerAppAttempt(false);
-
-    // We explicitly intercepted the kill-event to RMAppAttempt, so app should
-    // still be in KILLING state.
-    rm.waitForState(application.getApplicationId(), RMAppState.KILLING);
-    // AM should now be in running
-    rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
-
-    // Simulate that appAttempt is killed.
-    rm.getRMContext().getDispatcher().getEventHandler().handle(
-        new RMAppEvent(application.getApplicationId(),
-          RMAppEventType.ATTEMPT_KILLED));
-    rm.waitForState(application.getApplicationId(), RMAppState.KILLED);
+  private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
+      MockAM am) throws Exception {
+    FinishApplicationMasterRequest req =
+        FinishApplicationMasterRequest.newInstance(
+          FinalApplicationStatus.SUCCEEDED, "", "");
+    am.unregisterAppAttempt(req);
+    am.waitForState(RMAppAttemptState.FINISHING);
+    nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+    rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
   }
 
   public static void main(String[] args) throws Exception {
@@ -559,10 +493,5 @@ public class TestRM {
     t.testGetNewAppId();
     t.testAppWithNoContainers();
     t.testAppOnMultiNode();
-    t.testNMToken();
-    t.testActivatingApplicationAfterAddingNM();
-    t.testInvalidateAMHostPortWhenAMFailedOrKilled();
-    t.testInvalidatedAMHostPortOnAMRestart();
-    t.testApplicationKillAtAcceptedState();
   }
 }

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Fri Feb  7 20:33:01 2014
@@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
@@ -180,7 +179,7 @@ public class TestRMRestart {
     am1.registerAppAttempt();
 
     // AM request for containers
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());   
+    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>());    
     // kick the scheduler
     nm1.nodeHeartbeat(true);
     List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
@@ -249,7 +248,7 @@ public class TestRMRestart {
     // verify correct number of attempts and other data
     RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
     Assert.assertNotNull(loadedApp1);
-    Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
+    //Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
     Assert.assertEquals(app1.getApplicationSubmissionContext()
         .getApplicationId(), loadedApp1.getApplicationSubmissionContext()
         .getApplicationId());
@@ -262,7 +261,7 @@ public class TestRMRestart {
         .getApplicationId());
     
     // verify state machine kicked into expected states
-    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
     
     // verify attempts for apps
@@ -300,11 +299,7 @@ public class TestRMRestart {
     nm2.registerNode();
     
     rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
-    // wait for the 2nd attempt to be started.
-    int timeoutSecs = 0;
-    while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
-      Thread.sleep(200);
-    }
+    Assert.assertEquals(2, loadedApp1.getAppAttempts().size());    
 
     // verify no more reboot response sent
     hbResponse = nm1.nodeHeartbeat(true);
@@ -481,10 +476,10 @@ public class TestRMRestart {
     Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
     
     RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    // application should be in ACCEPTED state
-    rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+    // application should be in running state
+    rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
     
-    Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
     // new attempt should not be started
     Assert.assertEquals(2, rmApp.getAppAttempts().size());
     // am1 attempt should be in FAILED state where as am2 attempt should be in
@@ -521,9 +516,9 @@ public class TestRMRestart {
     nm1.setResourceTrackerService(rm3.getResourceTrackerService());
     
     rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
-    // application should be in ACCEPTED state
-    rm3.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
-    Assert.assertEquals(rmApp.getState(), RMAppState.ACCEPTED);
+    // application should be in running state
+    rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
     // new attempt should not be started
     Assert.assertEquals(3, rmApp.getAppAttempts().size());
     // am1 and am2 attempts should be in FAILED state where as am3 should be
@@ -567,11 +562,6 @@ public class TestRMRestart {
     
     rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
     rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
-    // wait for the attempt to be created.
-    int timeoutSecs = 0;
-    while (rmApp.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
-      Thread.sleep(200);
-    }
     Assert.assertEquals(4, rmApp.getAppAttempts().size());
     Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
     rm4.waitForState(latestAppAttemptId, RMAppAttemptState.SCHEDULED);
@@ -1544,128 +1534,6 @@ public class TestRMRestart {
     Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
   }
 
-  @SuppressWarnings("resource")
-  @Test
-  public void testQueueMetricsOnRMRestart() throws Exception {
-    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
-        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
-    MemoryRMStateStore memStore = new MemoryRMStateStore();
-    memStore.init(conf);
-
-    // PHASE 1: create state in an RM
-    // start RM
-    MockRM rm1 = new MockRM(conf, memStore);
-    rm1.start();
-    MockNM nm1 =
-        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
-    nm1.registerNode();
-    QueueMetrics qm1 = rm1.getResourceScheduler().getRootQueueMetrics();
-    resetQueueMetrics(qm1);
-    assertQueueMetrics(qm1, 0, 0, 0, 0);
-
-    // create app that gets launched and does allocate before RM restart
-    RMApp app1 = rm1.submitApp(200);
-    assertQueueMetrics(qm1, 1, 1, 0, 0);
-    nm1.nodeHeartbeat(true);
-    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
-    ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
-    rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
-    MockAM am1 = rm1.sendAMLaunched(attempt1.getAppAttemptId());
-    am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 1, new ArrayList<ContainerId>()); 
-    nm1.nodeHeartbeat(true);
-    List<Container> conts = am1.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers();
-    while (conts.size() == 0) {
-      nm1.nodeHeartbeat(true);
-      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers());
-      Thread.sleep(500);
-    }
-    assertQueueMetrics(qm1, 1, 0, 1, 0);
-
-    // PHASE 2: create new RM and start from old state
-    // create new RM to represent restart and recover state
-    MockRM rm2 = new MockRM(conf, memStore);
-    rm2.start();
-    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
-    QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
-    resetQueueMetrics(qm2);
-    assertQueueMetrics(qm2, 0, 0, 0, 0);
-    // recover app
-    RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
-    am1.setAMRMProtocol(rm2.getApplicationMasterService());
-    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
-    nm1.nodeHeartbeat(true);
-    nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
-    List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
-    ContainerStatus containerStatus =
-        BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
-            .getCurrentAppAttempt().getAppAttemptId(), 1),
-            ContainerState.COMPLETE, "Killed AM container", 143);
-    containerStatuses.add(containerStatus);
-    nm1.registerNode(containerStatuses);
-    int timeoutSecs = 0;
-    while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {;
-      Thread.sleep(200);
-    }
-
-    assertQueueMetrics(qm2, 1, 1, 0, 0);
-    nm1.nodeHeartbeat(true);
-    attempt1 = loadedApp1.getCurrentAppAttempt();
-    attemptId1 = attempt1.getAppAttemptId();
-    rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
-    assertQueueMetrics(qm2, 1, 0, 1, 0);
-    am1 = rm2.sendAMLaunched(attempt1.getAppAttemptId());
-    am1.registerAppAttempt();
-    am1.allocate("127.0.0.1" , 1000, 3, new ArrayList<ContainerId>());
-    nm1.nodeHeartbeat(true);
-    conts = am1.allocate(new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>()).getAllocatedContainers();
-    while (conts.size() == 0) {
-      nm1.nodeHeartbeat(true);
-      conts.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers());
-      Thread.sleep(500);
-    }
-
-    // finish the AMs
-    finishApplicationMaster(loadedApp1, rm2, nm1, am1);
-    assertQueueMetrics(qm2, 1, 0, 0, 1);
-
-    // stop RM's
-    rm2.stop();
-    rm1.stop();
-  }
-
-
-  // The metrics has some carry-on value from the previous RM, because the
-  // test case is in-memory, for the same queue name (e.g. root), there's
-  // always a singleton QueueMetrics object.
-  private int appsSubmittedCarryOn = 0;
-  private int appsPendingCarryOn = 0;
-  private int appsRunningCarryOn = 0;
-  private int appsCompletedCarryOn = 0;
-
-  private void resetQueueMetrics(QueueMetrics qm) {
-    appsSubmittedCarryOn = qm.getAppsSubmitted();
-    appsPendingCarryOn = qm.getAppsPending();
-    appsRunningCarryOn = qm.getAppsRunning();
-    appsCompletedCarryOn = qm.getAppsCompleted();
-  }
-
-  private void assertQueueMetrics(QueueMetrics qm, int appsSubmitted,
-      int appsPending, int appsRunning, int appsCompleted) {
-    Assert.assertEquals(qm.getAppsSubmitted(),
-        appsSubmitted + appsSubmittedCarryOn);
-    Assert.assertEquals(qm.getAppsPending(),
-        appsPending + appsPendingCarryOn);
-    Assert.assertEquals(qm.getAppsRunning(),
-        appsRunning + appsRunningCarryOn);
-    Assert.assertEquals(qm.getAppsCompleted(),
-        appsCompleted + appsCompletedCarryOn);
-  }
-
   public class TestMemoryRMStateStore extends MemoryRMStateStore {
     int count = 0;
     public int updateApp = 0;

Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java Fri Feb  7 20:33:01 2014
@@ -164,7 +164,7 @@ public class TestResourceManager {
     // Notify scheduler application is finished.
     AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
         new AppAttemptRemovedSchedulerEvent(
-          application.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+          application.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
     resourceManager.getResourceScheduler().handle(appRemovedEvent1);
     
     checkResourceUsage(nm1, nm2);



Mime
View raw message