hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1160521 - in /hadoop/common/trunk/hadoop-mapreduce: ./ hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/ hadoop-yarn/hado...
Date Tue, 23 Aug 2011 01:32:33 GMT
Author: acmurthy
Date: Tue Aug 23 01:32:32 2011
New Revision: 1160521

URL: http://svn.apache.org/viewvc?rev=1160521&view=rev
Log:
MAPREDUCE-2649. Handling of finished applications in RM. Contributed by Thomas Graves.

Added:
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

Modified: hadoop/common/trunk/hadoop-mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/CHANGES.txt?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce/CHANGES.txt Tue Aug 23 01:32:32 2011
@@ -1127,6 +1127,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-2868. ant build broken in hadoop-mapreduce dir (mahadev, giri and arun via mahadev)
 
+    MAPREDUCE-2649. Handling of finished applications in RM. (Thomas Graves
+    via acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue Aug 23 01:32:32 2011
@@ -244,8 +244,8 @@ public class YARNRunner implements Clien
     
     ApplicationReport appMaster = resMgrDelegate
         .getApplicationReport(applicationId);
-    if (appMaster.getState() == ApplicationState.FAILED || appMaster.getState() ==
-      ApplicationState.KILLED) {
+    if (appMaster == null || appMaster.getState() == ApplicationState.FAILED 
+        || appMaster.getState() == ApplicationState.KILLED) {
       throw RPCUtil.getRemoteException("failed to run job");
     }
     return clientServiceDelegate.getJobStatus(jobId);

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Tue Aug 23 01:32:32 2011
@@ -45,6 +45,14 @@
     <value>/etc/krb5.keytab</value>
   </property>
 
+  <property>
+    <name>yarn.server.resourcemanager.expire.applications.completed.max</name>
+    <value>10000</value>
+    <description>the maximum number of completed applications the RM 
+      keeps in memory
+    </description>
+  </property>
+
 <!-- All nodemanager related configuration properties -->
 
   <property>

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Aug 23 01:32:32 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
@@ -70,10 +69,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
-import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -97,8 +93,6 @@ public class ClientRMService extends Abs
   final private AtomicInteger applicationCounter = new AtomicInteger(0);
   final private YarnScheduler scheduler;
   final private RMContext rmContext;
-  private final ApplicationMasterService masterService;
-  private final ClientToAMSecretManager clientToAMSecretManager;
   private final AMLivelinessMonitor amLivelinessMonitor;
 
   private String clientServiceBindAddress;
@@ -109,15 +103,11 @@ public class ClientRMService extends Abs
   private  ApplicationACLsManager aclsManager;
   private Map<ApplicationACL, AccessControlList> applicationACLs;
   
-  public ClientRMService(RMContext rmContext,
-      ClientToAMSecretManager clientToAMSecretManager,
-      YarnScheduler scheduler, ApplicationMasterService masterService) {
+  public ClientRMService(RMContext rmContext, YarnScheduler scheduler) {
     super(ClientRMService.class.getName());
     this.scheduler = scheduler;
     this.rmContext = rmContext;
-    this.masterService = masterService;
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
-    this.clientToAMSecretManager = clientToAMSecretManager;
   }
   
   @Override
@@ -206,42 +196,17 @@ public class ClientRMService extends Abs
     ApplicationSubmissionContext submissionContext = request
         .getApplicationSubmissionContext();
     try {
-
-      ApplicationId applicationId = submissionContext.getApplicationId();
-      String clientTokenStr = null;
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      if (UserGroupInformation.isSecurityEnabled()) {
-        Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>(
-            new ApplicationTokenIdentifier(applicationId),
-            this.clientToAMSecretManager);
-        clientTokenStr = clientToken.encodeToUrlString();
-        LOG.debug("Sending client token as " + clientTokenStr);
-      }
-
-      submissionContext.setQueue(submissionContext.getQueue() == null
-          ? "default" : submissionContext.getQueue());
-      submissionContext.setApplicationName(submissionContext
-          .getApplicationName() == null ? "N/A" : submissionContext
-          .getApplicationName());
-
-      ApplicationStore appStore = rmContext.getApplicationsStore()
-          .createApplicationStore(submissionContext.getApplicationId(),
-              submissionContext);
-      RMApp application = new RMAppImpl(applicationId, rmContext,
-          getConfig(), submissionContext.getApplicationName(), user,
-          submissionContext.getQueue(), submissionContext, clientTokenStr,
-          appStore, this.amLivelinessMonitor, this.scheduler,
-          this.masterService);
-      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+      ApplicationId applicationId = submissionContext.getApplicationId();
+      if (rmContext.getRMApps().get(applicationId) != null) {
         throw new IOException("Application with id " + applicationId
             + " is already present! Cannot add a duplicate!");
       }
-
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(applicationId, RMAppEventType.START));
+          new RMAppManagerSubmitEvent(submissionContext));
 
-      LOG.info("Application with id " + applicationId.getId()
-          + " submitted by user " + user + " with " + submissionContext);
+      LOG.info("Application with id " + applicationId.getId() + 
+          " submitted by user " + user + " with " + submissionContext);
     } catch (IOException ie) {
       LOG.info("Exception in submitting application", ie);
       throw RPCUtil.getRemoteException(ie);

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+
+/**
+ * This class manages the list of applications for the resource manager. 
+ */
+public class RMAppManager implements EventHandler<RMAppManagerEvent> {
+
+  private static final Log LOG = LogFactory.getLog(RMAppManager.class);
+
+  private int completedAppsMax = RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX;
+  private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
+
+  private final RMContext rmContext;
+  private final ClientToAMSecretManager clientToAMSecretManager;
+  private final ApplicationMasterService masterService;
+  private final YarnScheduler scheduler;
+  private Configuration conf;
+
+  public RMAppManager(RMContext context, ClientToAMSecretManager 
+      clientToAMSecretManager, YarnScheduler scheduler, 
+      ApplicationMasterService masterService, Configuration conf) {
+    this.rmContext = context;
+    this.scheduler = scheduler;
+    this.clientToAMSecretManager = clientToAMSecretManager;
+    this.masterService = masterService;
+    this.conf = conf;
+    setCompletedAppsMax(conf.getInt(
+        RMConfig.EXPIRE_APPLICATIONS_COMPLETED_MAX,
+        RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX));
+  }
+
+  protected void setCompletedAppsMax(int max) {
+    this.completedAppsMax = max;
+  }
+
+  protected synchronized int getCompletedAppsListSize() {
+    return this.completedApps.size(); 
+  }
+
+  protected synchronized void addCompletedApp(ApplicationId appId) {
+    if (appId == null) {
+      LOG.error("RMAppManager received completed appId of null, skipping");
+    } else {
+      completedApps.add(appId);  
+    }
+  };
+
+  /*
+   * check to see if hit the limit for max # completed apps kept
+   */
+  protected synchronized void checkAppNumCompletedLimit() {
+    while (completedApps.size() > this.completedAppsMax) {
+      ApplicationId removeId = completedApps.remove();  
+      LOG.info("Application should be expired, max # apps"
+          + " met. Removing app: " + removeId); 
+      rmContext.getRMApps().remove(removeId);
+    }
+  }
+
+  protected void submitApplication(ApplicationSubmissionContext submissionContext) {
+    ApplicationId applicationId = submissionContext.getApplicationId();
+    RMApp application = null;
+    try {
+      String clientTokenStr = null;
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      if (UserGroupInformation.isSecurityEnabled()) {
+        Token<ApplicationTokenIdentifier> clientToken = new 
+            Token<ApplicationTokenIdentifier>(
+            new ApplicationTokenIdentifier(applicationId),
+            this.clientToAMSecretManager);
+        clientTokenStr = clientToken.encodeToUrlString();
+        LOG.debug("Sending client token as " + clientTokenStr);
+      }
+      submissionContext.setQueue(submissionContext.getQueue() == null
+          ? "default" : submissionContext.getQueue());
+      submissionContext.setApplicationName(submissionContext
+          .getApplicationName() == null ? "N/A" : submissionContext
+          .getApplicationName());
+      ApplicationStore appStore = rmContext.getApplicationsStore()
+          .createApplicationStore(submissionContext.getApplicationId(),
+          submissionContext);
+      application = new RMAppImpl(applicationId, rmContext,
+          this.conf, submissionContext.getApplicationName(), user,
+          submissionContext.getQueue(), submissionContext, clientTokenStr,
+          appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
+          this.masterService);
+
+      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+        LOG.info("Application with id " + applicationId + 
+            " is already present! Cannot add a duplicate!");
+        // don't send event through dispatcher as it will be handled by app already
+        // present with this id.
+        application.handle(new RMAppRejectedEvent(applicationId,
+            "Application with this id is already present! Cannot add a duplicate!"));
+      } else {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.START));
+      }
+    } catch (IOException ie) {
+        LOG.info("RMAppManager submit application exception", ie);
+        if (application != null) {
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppRejectedEvent(applicationId, ie.getMessage()));
+        }
+    }
+  }
+
+  @Override
+  public void handle(RMAppManagerEvent event) {
+    ApplicationId appID = event.getApplicationId();
+    LOG.debug("RMAppManager processing event for " 
+        + appID + " of type " + event.getType());
+    switch(event.getType()) {
+      case APP_COMPLETED: 
+      {
+        addCompletedApp(appID);
+        checkAppNumCompletedLimit(); 
+      } 
+      break;
+      case APP_SUBMIT:
+      {
+        ApplicationSubmissionContext submissionContext = 
+            ((RMAppManagerSubmitEvent)event).getSubmissionContext();        
+        submitApplication(submissionContext);
+      }
+      break;
+      default:
+        LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
+      }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMAppManagerEvent extends AbstractEvent<RMAppManagerEventType> {
+
+  private final ApplicationId appId;
+
+  public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) {
+    super(type);
+    this.appId = appId;
+  }
+
+  public ApplicationId getApplicationId() {
+    return this.appId;
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,6 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+public enum RMAppManagerEventType {
+  APP_SUBMIT,
+  APP_COMPLETED
+}

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
+
+  private final ApplicationSubmissionContext submissionContext;
+
+  public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) {
+    super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT);
+    this.submissionContext = submissionContext;
+  }
+
+  public ApplicationSubmissionContext getSubmissionContext() {
+    return this.submissionContext;
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java Tue Aug 23 01:32:32 2011
@@ -85,4 +85,9 @@ public class RMConfig {
   public static final String RM_NODES_EXCLUDE_FILE = 
     YarnConfiguration.RM_PREFIX + "nodes.exclude";
   public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
+
+  // the maximum number of completed applications RM keeps 
+  public static String EXPIRE_APPLICATIONS_COMPLETED_MAX =
+    YarnConfiguration.RM_PREFIX + "expire.applications.completed.max";
+  public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000;
 }

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 23 01:32:32 2011
@@ -98,6 +98,7 @@ public class ResourceManager extends Com
   protected NMLivelinessMonitor nmLivelinessMonitor;
   protected NodesListManager nodesListManager;
   private SchedulerEventDispatcher schedulerDispatcher;
+  private RMAppManager rmAppManager;
 
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
   private WebApp webApp;
@@ -176,6 +177,11 @@ public class ResourceManager extends Com
 
     clientRM = createClientRMService();
     addService(clientRM);
+
+    this.rmAppManager = createRMAppManager();
+    // Register event handler for RMAppManagerEvents
+    this.rmDispatcher.register(RMAppManagerEventType.class,
+        this.rmAppManager);
     
     adminService = createAdminService();
     addService(adminService);
@@ -215,6 +221,11 @@ public class ResourceManager extends Com
     return new AMLivelinessMonitor(this.rmDispatcher);
   }
 
+  protected RMAppManager createRMAppManager() {
+    return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
+        this.scheduler, this.masterService, this.conf);
+  }
+
   @Private
   public static final class SchedulerEventDispatcher extends AbstractService
       implements EventHandler<SchedulerEvent> {
@@ -429,8 +440,7 @@ public class ResourceManager extends Com
   }
 
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(this.rmContext, this.clientToAMSecretManager,
-        scheduler, masterService);
+    return new ClientRMService(this.rmContext, scheduler);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Aug 23 01:32:32 2011
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -89,6 +91,8 @@ public class RMAppImpl implements RMApp 
         RMAppEventType.START, new StartAppAttemptTransition())
     .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
         new AppKilledTransition())
+    .addTransition(RMAppState.NEW, RMAppState.FAILED,
+        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
 
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
@@ -429,6 +433,9 @@ public class RMAppImpl implements RMApp 
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
       app.finishTime = System.currentTimeMillis();
+      app.dispatcher.getEventHandler().handle(
+          new RMAppManagerEvent(app.applicationId, 
+          RMAppManagerEventType.APP_COMPLETED));
     };
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Aug 23 01:32:32 2011
@@ -42,8 +42,13 @@ public class MockRM extends ResourceMana
 
   public void waitForState(ApplicationId appId, RMAppState finalState) 
       throws Exception {
-    RMApp app = getRMContext().getRMApps().get(appId);
     int timeoutSecs = 0;
+    RMApp app = null;
+    while ((app == null) && timeoutSecs++ < 20) {
+      app = getRMContext().getRMApps().get(appId);
+      Thread.sleep(500);
+    }
+    timeoutSecs = 0;
     while (!finalState.equals(app.getState()) &&
         timeoutSecs++ < 20) {
       System.out.println("App State is : " + app.getState() +
@@ -108,8 +113,7 @@ public class MockRM extends ResourceMana
 
   @Override
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(getRMContext(),
-        clientToAMSecretManager, getResourceScheduler(), masterService) {
+    return new ClientRMService(getRMContext(), getResourceScheduler()) {
       @Override
       public void start() {
         //override to not start rpc handler

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.service.Service;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
+
+/**
+ * Testing applications being retired from RM.
+ *
+ */
+
+public class TestAppManager{
+  private static final Log LOG = LogFactory.getLog(TestAppManager.class);
+  private static RMAppEventType appEventType = RMAppEventType.KILL; 
+
+  public synchronized RMAppEventType getAppEventType() {
+    return appEventType;
+  } 
+  public synchronized void setAppEventType(RMAppEventType newType) {
+    appEventType = newType;
+  } 
+
+
+  public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
+    List<RMApp> list = Lists.newArrayList();
+    for (int i = 0; i < n; ++i) {
+      list.add(new MockRMApp(i, time, state));
+    }
+    return list;
+  }
+
+  public static RMContext mockRMContext(int n, long time) {
+    final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
+    final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
+    for (RMApp app : apps) {
+      map.put(app.getApplicationId(), app);
+    }
+    Dispatcher rmDispatcher = new AsyncDispatcher();
+    ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
+        rmDispatcher);
+    AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
+        rmDispatcher);
+    return new RMContextImpl(new MemStore(), rmDispatcher,
+        containerAllocationExpirer, amLivelinessMonitor) {
+      @Override
+      public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
+        return map;
+      }
+    };
+  }
+
+  public class TestAppManagerDispatcher implements
+      EventHandler<RMAppManagerEvent> {
+
+    private final RMContext rmContext;
+
+    public TestAppManagerDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(RMAppManagerEvent event) {
+       // do nothing
+    }   
+  }   
+
+  public class TestDispatcher implements
+      EventHandler<RMAppEvent> {
+
+    private final RMContext rmContext;
+
+    public TestDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(RMAppEvent event) {
+      ApplicationId appID = event.getApplicationId();
+      //RMApp rmApp = this.rmContext.getRMApps().get(appID);
+      setAppEventType(event.getType());
+      System.out.println("in handle routine " + getAppEventType().toString());
+    }   
+  }   
+  
+
+  // Extend and make the functions we want to test public
+  public class TestRMAppManager extends RMAppManager {
+
+    public TestRMAppManager(RMContext context, Configuration conf) {
+      super(context, null, null, null, conf);
+      setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
+    }
+
+    public TestRMAppManager(RMContext context, ClientToAMSecretManager
+        clientToAMSecretManager, YarnScheduler scheduler,
+        ApplicationMasterService masterService, Configuration conf) {
+      super(context, clientToAMSecretManager, scheduler, masterService, conf);
+      setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
+    }
+
+    public void checkAppNumCompletedLimit() {
+      super.checkAppNumCompletedLimit();
+    }
+
+    public void addCompletedApp(ApplicationId appId) {
+      super.addCompletedApp(appId);
+    }
+
+    public int getCompletedAppsListSize() {
+      return super.getCompletedAppsListSize();
+    }
+
+    public void setCompletedAppsMax(int max) {
+      super.setCompletedAppsMax(max);
+    }
+    public void submitApplication(ApplicationSubmissionContext submissionContext) {
+      super.submitApplication(submissionContext);
+    }
+  }
+
+  protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
+    for (RMApp app : rmContext.getRMApps().values()) {
+      if (app.getState() == RMAppState.FINISHED
+          || app.getState() == RMAppState.KILLED 
+          || app.getState() == RMAppState.FAILED) {
+        appMonitor.addCompletedApp(app.getApplicationId());
+      }
+    }
+  }
+
+  @Test
+  public void testRMAppRetireNone() throws Exception {
+    long now = System.currentTimeMillis();
+
+    // Create such that none of the applications will retire since
+    // haven't hit max #
+    RMContext rmContext = mockRMContext(10, now - 10);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+    appMonitor.setCompletedAppsMax(10);
+
+    Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
+        10, rmContext.getRMApps().size());
+
+    // add them to completed apps list
+    addToCompletedApps(appMonitor, rmContext);
+
+    // shouldn't  have to many apps
+    appMonitor.checkAppNumCompletedLimit();
+    Assert.assertEquals("Number of apps incorrect after # completed check", 10,
+        rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check", 10,
+        appMonitor.getCompletedAppsListSize());
+  }
+
+  @Test
+  public void testRMAppRetireSome() throws Exception {
+    long now = System.currentTimeMillis();
+
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+    appMonitor.setCompletedAppsMax(3);
+
+    Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+        .getRMApps().size());
+
+    // add them to completed apps list
+    addToCompletedApps(appMonitor, rmContext);
+
+    // shouldn't  have to many apps
+    appMonitor.checkAppNumCompletedLimit();
+    Assert.assertEquals("Number of apps incorrect after # completed check", 3,
+        rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check", 3,
+        appMonitor.getCompletedAppsListSize());
+  }
+
+  @Test
+  public void testRMAppRetireSomeDifferentStates() throws Exception {
+    long now = System.currentTimeMillis();
+
+    // these parameters don't matter, override applications below
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+    appMonitor.setCompletedAppsMax(2);
+
+    // clear out applications map
+    rmContext.getRMApps().clear();
+    Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
+
+    // / set with various finished states
+    RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(1, now - 200000, RMAppState.FAILED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(2, now - 30000, RMAppState.FINISHED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(3, now - 20000, RMAppState.RUNNING);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(4, now - 20000, RMAppState.NEW);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+
+    // make sure it doesn't expire these since still running
+    app = new MockRMApp(5, now - 10001, RMAppState.KILLED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(6, now - 30000, RMAppState.ACCEPTED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(7, now - 20000, RMAppState.SUBMITTED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(8, now - 10001, RMAppState.FAILED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+    app = new MockRMApp(9, now - 20000, RMAppState.FAILED);
+    rmContext.getRMApps().put(app.getApplicationId(), app);
+
+    Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+        .getRMApps().size());
+
+    // add them to completed apps list
+    addToCompletedApps(appMonitor, rmContext);
+
+    // shouldn't  have to many apps
+    appMonitor.checkAppNumCompletedLimit();
+    Assert.assertEquals("Number of apps incorrect after # completed check", 6,
+        rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check", 2,
+        appMonitor.getCompletedAppsListSize());
+
+  }
+
+  @Test
+  public void testRMAppRetireNullApp() throws Exception {
+    long now = System.currentTimeMillis();
+
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+    Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+        .getRMApps().size());
+
+    appMonitor.addCompletedApp(null);
+
+    Assert.assertEquals("Number of completed apps incorrect after check", 0,
+        appMonitor.getCompletedAppsListSize());
+  }
+
+  @Test
+  public void testRMAppRetireZeroSetting() throws Exception {
+    long now = System.currentTimeMillis();
+
+    RMContext rmContext = mockRMContext(10, now - 20000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+
+    Assert.assertEquals("Number of apps incorrect before", 10, rmContext
+        .getRMApps().size());
+
+    // test with 0
+    appMonitor.setCompletedAppsMax(0);
+
+    addToCompletedApps(appMonitor, rmContext);
+    Assert.assertEquals("Number of completed apps incorrect", 10,
+        appMonitor.getCompletedAppsListSize());
+
+    appMonitor.checkAppNumCompletedLimit();
+
+    Assert.assertEquals("Number of apps incorrect after # completed check", 0,
+        rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check", 0,
+        appMonitor.getCompletedAppsListSize());
+  }
+
+  protected void setupDispatcher(RMContext rmContext, Configuration conf) {
+    TestDispatcher testDispatcher = new TestDispatcher(rmContext);
+    TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext);
+    rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
+    rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
+    ((Service)rmContext.getDispatcher()).init(conf);
+    ((Service)rmContext.getDispatcher()).start();
+    Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
+  }
+
+  @Test
+  public void testRMAppSubmit() throws Exception {
+    long now = System.currentTimeMillis();
+
+    RMContext rmContext = mockRMContext(0, now - 10);
+    ResourceScheduler scheduler = new CapacityScheduler();
+    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
+        new ApplicationTokenSecretManager(), scheduler);
+    Configuration conf = new Configuration();
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, 
+        new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+    ApplicationId appID = MockApps.newAppID(1);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    context.setApplicationId(appID);
+    setupDispatcher(rmContext, conf);
+
+    appMonitor.submitApplication(context);
+    RMApp app = rmContext.getRMApps().get(appID);
+    Assert.assertNotNull("app is null", app);
+    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+    Assert.assertEquals("app name doesn't match", "N/A", app.getName());
+    Assert.assertEquals("app queue doesn't match", "default", app.getQueue());
+    Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+    Assert.assertNotNull("app store is null", app.getApplicationStore());
+
+    // wait for event to be processed
+    int timeoutSecs = 0;
+    while ((getAppEventType() == RMAppEventType.KILL) && 
+        timeoutSecs++ < 20) {
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
+    setAppEventType(RMAppEventType.KILL); 
+    ((Service)rmContext.getDispatcher()).stop();
+  }
+
+  @Test
+  public void testRMAppSubmitWithQueueAndName() throws Exception {
+    long now = System.currentTimeMillis();
+
+    RMContext rmContext = mockRMContext(1, now - 10);
+    ResourceScheduler scheduler = new CapacityScheduler();
+    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
+        new ApplicationTokenSecretManager(), scheduler);
+    Configuration conf = new Configuration();
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, 
+        new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+    ApplicationId appID = MockApps.newAppID(10);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    context.setApplicationId(appID);
+    context.setApplicationName("testApp1");
+    context.setQueue("testQueue");
+
+    setupDispatcher(rmContext, conf);
+
+    appMonitor.submitApplication(context);
+    RMApp app = rmContext.getRMApps().get(appID);
+    Assert.assertNotNull("app is null", app);
+    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+    Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
+    Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
+    Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+    Assert.assertNotNull("app store is null", app.getApplicationStore());
+
+    // wait for event to be processed
+    int timeoutSecs = 0;
+    while ((getAppEventType() == RMAppEventType.KILL) && 
+        timeoutSecs++ < 20) {
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
+    setAppEventType(RMAppEventType.KILL); 
+    ((Service)rmContext.getDispatcher()).stop();
+  }
+
+  @Test
+  public void testRMAppSubmitError() throws Exception {
+    long now = System.currentTimeMillis();
+
+    // specify 1 here and use same appId below so it gets duplicate entry
+    RMContext rmContext = mockRMContext(1, now - 10);
+    ResourceScheduler scheduler = new CapacityScheduler();
+    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
+        new ApplicationTokenSecretManager(), scheduler);
+    Configuration conf = new Configuration();
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, 
+        new ClientToAMSecretManager(), scheduler, masterService, conf);
+
+    ApplicationId appID = MockApps.newAppID(0);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    context.setApplicationId(appID);
+    context.setApplicationName("testApp1");
+    context.setQueue("testQueue");
+
+    setupDispatcher(rmContext, conf);
+
+    RMApp appOrig = rmContext.getRMApps().get(appID);
+    Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
+
+    // our testApp1 should be rejected and original app with same id should be left in place
+    appMonitor.submitApplication(context);
+
+    // make sure original app didn't get removed
+    RMApp app = rmContext.getRMApps().get(appID);
+    Assert.assertNotNull("app is null", app);
+    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
+    Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
+    ((Service)rmContext.getDispatcher()).stop();
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1160521&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Tue Aug 23 01:32:32 2011
@@ -0,0 +1,106 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+
+public class MockRMApp implements RMApp {
+  static final int DT = 1000000; // ms
+
+  String user = MockApps.newUserName();
+  String name = MockApps.newAppName();
+  String queue = MockApps.newQueue();
+  long start = System.currentTimeMillis() - (int) (Math.random() * DT);
+  long finish = 0;
+  RMAppState state = RMAppState.NEW;
+  int failCount = 0;
+  ApplicationId id;
+
+  public MockRMApp(int newid, long time, RMAppState newState) {
+    finish = time;
+    id = MockApps.newAppID(newid);
+    state = newState;
+  }
+
+  public MockRMApp(int newid, long time, RMAppState newState, String userName) {
+    this(newid, time, newState);
+    user = userName;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return id;
+  }
+
+  @Override
+  public RMAppState getState() {
+    return state;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public float getProgress() {
+    return (float) 0.0;
+  }
+
+  @Override
+  public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public String getQueue() {
+    return queue;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public RMAppAttempt getCurrentAppAttempt() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public ApplicationReport createAndGetApplicationReport() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public ApplicationStore getApplicationStore() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public long getFinishTime() {
+    return finish;
+  }
+
+  @Override
+  public long getStartTime() {
+    return start;
+  }
+
+  @Override
+  public String getTrackingUrl() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public StringBuilder getDiagnostics() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  public void handle(RMAppEvent event) {
+  };
+
+}

Modified: hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1160521&r1=1160520&r2=1160521&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Tue Aug 23 01:32:32 2011
@@ -263,6 +263,18 @@ public class TestRMAppTransitions {
   }
 
   @Test
+  public void testAppNewReject() throws IOException {
+    LOG.info("--- START: testAppNewReject ---");
+
+    RMApp application = createNewTestApp();
+    // NEW => FAILED event RMAppEventType.APP_REJECTED
+    String rejectedText = "Test Application Rejected";
+    RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
+    application.handle(event);
+    assertFailed(application, rejectedText);
+  }
+
+  @Test
   public void testAppSubmittedRejected() throws IOException {
     LOG.info("--- START: testAppSubmittedRejected ---");
 



Mime
View raw message