Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B1CA10172 for ; Thu, 2 Jan 2014 20:21:42 +0000 (UTC) Received: (qmail 68709 invoked by uid 500); 2 Jan 2014 20:21:42 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 68670 invoked by uid 500); 2 Jan 2014 20:21:42 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 68662 invoked by uid 99); 2 Jan 2014 20:21:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 20:21:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jan 2014 20:21:29 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 871612388A2C; Thu, 2 Jan 2014 20:21:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1554898 [2/3] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/ hadoop-yarn/hadoop-yarn-server... Date: Thu, 02 Jan 2014 20:21:05 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140102202106.871612388A2C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu Jan 2 20:21:03 2014 @@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou */ @Private @Unstable -public class FSSchedulerApp extends SchedulerApplication { +public class FSSchedulerApp extends SchedulerApplicationAttempt { private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Jan 2 20:21:03 2014 @@ -38,6 +38,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,10 +59,13 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -75,8 +79,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -151,10 +157,15 @@ public class FairScheduler implements Re // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // This stores per-application scheduling information, indexed by + // This stores per-application scheduling information, + @VisibleForTesting + protected Map applications = + new ConcurrentHashMap(); + + // This stores per-application-attempt scheduling information, indexed by // attempt ID's for fast lookup. @VisibleForTesting - protected Map applications = + protected Map appAttempts = new ConcurrentHashMap(); // Nodes in the cluster, indexed by NodeId @@ -253,7 +264,7 @@ public class FairScheduler implements Re private RMContainer getRMContainer(ContainerId containerId) { FSSchedulerApp application = - applications.get(containerId.getApplicationAttemptId()); + appAttempts.get(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -591,44 +602,63 @@ public class FairScheduler implements Re * user. This will accept a new app even if the user or queue is above * configured limits, but the app will not be marked as runnable. */ - protected synchronized void addApplicationAttempt( - ApplicationAttemptId applicationAttemptId, String queueName, String user) { + protected synchronized void addApplication(ApplicationId applicationId, + String queueName, String user) { if (queueName == null || queueName.isEmpty()) { - String message = "Reject application " + applicationAttemptId + + String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; LOG.info(message); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptRejectedEvent(applicationAttemptId, message)); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); return; } - RMApp rmApp = rmContext.getRMApps().get( - applicationAttemptId.getApplicationId()); + RMApp rmApp = rmContext.getRMApps().get(applicationId); FSLeafQueue queue = assignToQueue(rmApp, queueName, user); if (queue == null) { rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptRejectedEvent(applicationAttemptId, + new RMAppRejectedEvent(applicationId, "Application rejected by queue placement policy")); return; } - FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, user, - queue, new ActiveUsersManager(getRootQueueMetrics()), - rmContext); - // Enforce ACLs UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { String msg = "User " + userUgi.getUserName() + - " cannot submit applications to queue " + queue.getName(); + " cannot submit applications to queue " + queue.getName(); LOG.info(msg); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptRejectedEvent(applicationAttemptId, msg)); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, msg)); return; } + + SchedulerApplication application = + new SchedulerApplication(queue, user); + applications.put(applicationId, application); + + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queueName); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } + + /** + * Add a new application attempt to the scheduler. + */ + protected synchronized void addApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); + String user = application.getUser(); + FSLeafQueue queue = (FSLeafQueue) application.getQueue(); + + FSSchedulerApp schedulerApp = + new FSSchedulerApp(applicationAttemptId, user, + queue, new ActiveUsersManager(getRootQueueMetrics()), + rmContext); boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); queue.addApp(schedulerApp, runnable); @@ -639,16 +669,14 @@ public class FairScheduler implements Re } queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); + appAttempts.put(applicationAttemptId, schedulerApp); - applications.put(applicationAttemptId, schedulerApp); - - LOG.info("Application Submission: " + applicationAttemptId + - ", user: "+ user + - ", currently active: " + applications.size()); - + LOG.info("Added Application Attempt " + applicationAttemptId + + " to scheduler from user: " + user + ", currently active: " + + appAttempts.size()); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.APP_ACCEPTED)); + RMAppAttemptEventType.ATTEMPT_ADDED)); } @VisibleForTesting @@ -674,13 +702,18 @@ public class FairScheduler implements Re return queue; } + private synchronized void removeApplication(ApplicationId applicationId, + RMAppState finalState) { + applications.remove(applicationId); + } + private synchronized void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - FSSchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = appAttempts.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -720,7 +753,7 @@ public class FairScheduler implements Re } // Remove from our data-structure - applications.remove(applicationAttemptId); + appAttempts.remove(applicationAttemptId); } /** @@ -737,7 +770,7 @@ public class FairScheduler implements Re // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - FSSchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = appAttempts.get(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + " unknown application " + applicationAttemptId + @@ -811,7 +844,7 @@ public class FairScheduler implements Re List ask, List release, List blacklistAdditions, List blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = applications.get(appAttemptId); + FSSchedulerApp application = appAttempts.get(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -882,7 +915,7 @@ public class FairScheduler implements Re private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - FSSchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = appAttempts.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -1025,23 +1058,23 @@ public class FairScheduler implements Re } public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - return applications.get(appAttemptId); + return appAttempts.get(appAttemptId); } @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { - if (!applications.containsKey(appAttemptId)) { + if (!appAttempts.containsKey(appAttemptId)) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; } - return new SchedulerAppReport(applications.get(appAttemptId)); + return new SchedulerAppReport(appAttempts.get(appAttemptId)); } @Override public ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId) { - FSSchedulerApp app = applications.get(appAttemptId); + FSSchedulerApp app = appAttempts.get(appAttemptId); if (app == null) { LOG.error("Request for appInfo of unknown attempt" + appAttemptId); return null; @@ -1090,15 +1123,29 @@ public class FairScheduler implements Re NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); break; + case APP_ADDED: + if (!(event instanceof AppAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; + addApplication(appAddedEvent.getApplicationId(), + appAddedEvent.getQueue(), appAddedEvent.getUser()); + break; + case APP_REMOVED: + if (!(event instanceof AppRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; + removeApplication(appRemovedEvent.getApplicationID(), + appRemovedEvent.getFinalState()); + break; case APP_ATTEMPT_ADDED: if (!(event instanceof AppAttemptAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; - String queue = appAttemptAddedEvent.getQueue(); - addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - queue, appAttemptAddedEvent.getUser()); + addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Thu Jan 2 20:21:03 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,6 +59,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -74,12 +78,15 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -116,11 +123,15 @@ public class FifoScheduler implements Re private Resource maximumAllocation; private boolean usePortForNodeName; + @VisibleForTesting + protected Map applications = + new ConcurrentSkipListMap(); + // Use ConcurrentSkipListMap because applications need to be ordered @VisibleForTesting - protected Map applications + protected Map appAttempts = new ConcurrentSkipListMap(); - + private ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default"; @@ -327,7 +338,7 @@ public class FifoScheduler implements Re @VisibleForTesting FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { - return applications.get(applicationAttemptId); + return appAttempts.get(applicationAttemptId); } @Override @@ -347,20 +358,44 @@ public class FifoScheduler implements Re private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - - private synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - String user) { + + private synchronized void addApplication(ApplicationId applicationId, + String queue, String user) { + SchedulerApplication application = + new SchedulerApplication(null, user); + applications.put(applicationId, application); + LOG.info("Accepted application " + applicationId + " from user: " + user); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } + + private synchronized void addApplicationAttempt( + ApplicationAttemptId appAttemptId) { + SchedulerApplication application = + applications.get(appAttemptId.getApplicationId()); + String user = application.getUser(); // TODO: Fix store - FiCaSchedulerApp schedulerApp = - new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, - this.rmContext); - applications.put(appAttemptId, schedulerApp); + FiCaSchedulerApp schedulerApp = + new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, + activeUsersManager, this.rmContext); + appAttempts.put(appAttemptId, schedulerApp); metrics.submitApp(user, appAttemptId.getAttemptId()); - LOG.info("Application Submission: " + appAttemptId.getApplicationId() + - " from " + user + ", currently active: " + applications.size()); + LOG.info("Added Application Attempt " + appAttemptId + + " to scheduler from user " + application.getUser() + + ", currently active: " + appAttempts.size()); rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.APP_ACCEPTED)); + RMAppAttemptEventType.ATTEMPT_ADDED)); + } + + private synchronized void doneApplication(ApplicationId applicationId, + RMAppState finalState) { + SchedulerApplication application = applications.get(applicationId); + + // Inform the activeUsersManager + activeUsersManager.deactivateApplication(application.getUser(), + applicationId); + applications.remove(applicationId); } private synchronized void doneApplicationAttempt( @@ -382,17 +417,11 @@ public class FifoScheduler implements Re RMContainerEventType.KILL); } - // Inform the activeUsersManager - synchronized (application) { - activeUsersManager.deactivateApplication( - application.getUser(), application.getApplicationId()); - } - // Clean up pending requests, metrics etc. application.stop(rmAppAttemptFinalState); // Remove the application - applications.remove(applicationAttemptId); + appAttempts.remove(applicationAttemptId); } /** @@ -403,10 +432,10 @@ public class FifoScheduler implements Re private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + - " #applications=" + applications.size()); + " #applications=" + appAttempts.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : applications + for (Map.Entry e : appAttempts .entrySet()) { FiCaSchedulerApp application = e.getValue(); LOG.debug("pre-assignContainers"); @@ -445,7 +474,7 @@ public class FifoScheduler implements Re // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (FiCaSchedulerApp application : applications.values()) { + for (FiCaSchedulerApp application : appAttempts.values()) { application.setHeadroom(Resources.subtract(clusterResource, usedResource)); } } @@ -697,12 +726,25 @@ public class FifoScheduler implements Re nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; + case APP_ADDED: + { + AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; + addApplication(appAddedEvent.getApplicationId(), + appAddedEvent.getQueue(), appAddedEvent.getUser()); + } + break; + case APP_REMOVED: + { + AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; + doneApplication(appRemovedEvent.getApplicationID(), + appRemovedEvent.getFinalState()); + } + break; case APP_ATTEMPT_ADDED: { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; - addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getUser()); + addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId()); } break; case APP_ATTEMPT_REMOVED: @@ -867,8 +909,8 @@ public class FifoScheduler implements Re public synchronized List getAppsInQueue(String queueName) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { List apps = new ArrayList( - applications.size()); - for (FiCaSchedulerApp app : applications.values()) { + appAttempts.size()); + for (FiCaSchedulerApp app : appAttempts.values()) { apps.add(app.getApplicationAttemptId()); } return apps; Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Thu Jan 2 20:21:03 2014 @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; @@ -164,11 +165,14 @@ public class Application { final ResourceScheduler scheduler = resourceManager.getResourceScheduler(); resourceManager.getClientRMService().submitApplication(request); - + // Notify scheduler - AppAttemptAddedSchedulerEvent appAddedEvent1 = new AppAttemptAddedSchedulerEvent( - this.applicationAttemptId, this.queue, this.user); - scheduler.handle(appAddedEvent1); + AppAddedSchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(this.applicationId, this.queue, "user"); + scheduler.handle(addAppEvent); + AppAttemptAddedSchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(this.applicationAttemptId); + scheduler.handle(addAttemptEvent); } public synchronized void addResourceRequestSpec( Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Thu Jan 2 20:21:03 2014 @@ -649,7 +649,7 @@ public class TestClientRMService { .currentTimeMillis(), "YARN")); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(applicationId3, 1); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, null); + rmContext, yarnScheduler, null, asContext, config); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); return app; } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Thu Jan 2 20:21:03 2014 @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -297,9 +298,12 @@ public class TestFifoScheduler { ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( appId1, 1); - SchedulerEvent event1 = - new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user"); - fs.handle(event1); + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId1, "queue", "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId1); + fs.handle(attemptEvent); List emptyId = new ArrayList(); List emptyAsk = new ArrayList(); @@ -388,16 +392,22 @@ public class TestFifoScheduler { ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( appId1, 1); - SchedulerEvent event1 = - new AppAttemptAddedSchedulerEvent(appAttemptId1, "queue", "user"); - fs.handle(event1); + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId1, "queue", "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId1); + fs.handle(attemptEvent); ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2); ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( appId2, 1); - SchedulerEvent event2 = - new AppAttemptAddedSchedulerEvent(appAttemptId2, "queue", "user"); - fs.handle(event2); + SchedulerEvent appEvent2 = + new AppAddedSchedulerEvent(appId2, "queue", "user"); + fs.handle(appEvent2); + SchedulerEvent attemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId2); + fs.handle(attemptEvent2); List emptyId = new ArrayList(); List emptyAsk = new ArrayList(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Jan 2 20:21:03 2014 @@ -248,7 +248,7 @@ public class TestRMRestart { // verify correct number of attempts and other data RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); Assert.assertNotNull(loadedApp1); - //Assert.assertEquals(1, loadedApp1.getAppAttempts().size()); + Assert.assertEquals(1, loadedApp1.getAppAttempts().size()); Assert.assertEquals(app1.getApplicationSubmissionContext() .getApplicationId(), loadedApp1.getApplicationSubmissionContext() .getApplicationId()); @@ -261,7 +261,7 @@ public class TestRMRestart { .getApplicationId()); // verify state machine kicked into expected states - rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED); // verify attempts for apps @@ -299,7 +299,11 @@ public class TestRMRestart { nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); - Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); + // wait for the 2nd attempt to be started. + int timeoutSecs = 0; + while (loadedApp1.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {; + Thread.sleep(200); + } // verify no more reboot response sent hbResponse = nm1.nodeHeartbeat(true); @@ -476,10 +480,10 @@ public class TestRMRestart { Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction()); RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - // application should be in running state - rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + // application should be in ACCEPTED state + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState()); // new attempt should not be started Assert.assertEquals(2, rmApp.getAppAttempts().size()); // am1 attempt should be in FAILED state where as am2 attempt should be in @@ -516,9 +520,9 @@ public class TestRMRestart { nm1.setResourceTrackerService(rm3.getResourceTrackerService()); rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId()); - // application should be in running state - rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING); - Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING); + // application should be in ACCEPTED state + rm3.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(rmApp.getState(), RMAppState.ACCEPTED); // new attempt should not be started Assert.assertEquals(3, rmApp.getAppAttempts().size()); // am1 and am2 attempts should be in FAILED state where as am3 should be @@ -562,6 +566,11 @@ public class TestRMRestart { rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId()); rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED); + // wait for the attempt to be created. + int timeoutSecs = 0; + while (rmApp.getAppAttempts().size() != 2 && timeoutSecs++ < 40) { + Thread.sleep(200); + } Assert.assertEquals(4, rmApp.getAppAttempts().size()); Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState()); rm4.waitForState(latestAppAttemptId, RMAppAttemptState.SCHEDULED); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java Thu Jan 2 20:21:03 2014 @@ -567,7 +567,9 @@ public class TestRMAppTransitions { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertAppAndAttemptKilled(application); + sendAppUpdateSavedEvent(application); + assertKilled(application); + assertAppFinalStateSaved(application); } @Test @@ -582,7 +584,7 @@ public class TestRMAppTransitions { new RMAppFailedAttemptEvent(application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, ""); application.handle(event); - assertAppState(RMAppState.SUBMITTED, application); + assertAppState(RMAppState.ACCEPTED, application); event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); @@ -612,7 +614,9 @@ public class TestRMAppTransitions { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); - assertAppAndAttemptKilled(application); + sendAppUpdateSavedEvent(application); + assertKilled(application); + assertAppFinalStateSaved(application); } @Test @@ -654,7 +658,7 @@ public class TestRMAppTransitions { RMAppEventType.ATTEMPT_FAILED, ""); application.handle(event); rmDispatcher.await(); - assertAppState(RMAppState.SUBMITTED, application); + assertAppState(RMAppState.ACCEPTED, application); appAttempt = application.getCurrentAppAttempt(); Assert.assertEquals(++expectedAttemptId, appAttempt.getAppAttemptId().getAttemptId()); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Thu Jan 2 20:21:03 2014 @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -258,7 +257,7 @@ public class TestRMAppAttemptTransitions application = mock(RMApp.class); applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, rmContext, scheduler, - masterService, submissionContext, new Configuration(), user); + masterService, submissionContext, new Configuration()); when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getApplicationId()).thenReturn(applicationId); @@ -408,9 +407,6 @@ public class TestRMAppAttemptTransitions assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); - - // Check events - verify(application).handle(any(RMAppEvent.class)); } /** @@ -446,7 +442,7 @@ public class TestRMAppAttemptTransitions assertEquals(0, applicationAttempt.getRanNodes().size()); // Check events - verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class)); + verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); } @@ -544,7 +540,7 @@ public class TestRMAppAttemptTransitions applicationAttempt.handle( new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), - RMAppAttemptEventType.APP_ACCEPTED)); + RMAppAttemptEventType.ATTEMPT_ADDED)); if(unmanagedAM){ assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, @@ -703,16 +699,6 @@ public class TestRMAppAttemptTransitions RMAppAttemptEventType.RECOVER)); testAppAttemptRecoveredState(); } - - @Test - public void testSubmittedToFailed() { - submitApplicationAttempt(); - String message = "Rejected"; - applicationAttempt.handle( - new RMAppAttemptRejectedEvent( - applicationAttempt.getAppAttemptId(), message)); - testAppAttemptSubmittedToFailedState(message); - } @Test public void testSubmittedToKilled() { Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Thu Jan 2 20:21:03 2014 @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -58,8 +59,12 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -378,4 +383,24 @@ public class TestSchedulerUtils { ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), 1), "x"); Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus()); } + + public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( + final Map applications, + EventHandler handler, String queueName) throws Exception { + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + AppAddedSchedulerEvent appAddedEvent = + new AppAddedSchedulerEvent(appId, queueName, "user"); + handler.handle(appAddedEvent); + SchedulerApplication app = applications.get(appId); + // verify application is added. + Assert.assertNotNull(app); + Assert.assertEquals("user", app.getUser()); + + AppRemovedSchedulerEvent appRemoveEvent = + new AppRemovedSchedulerEvent(appId, RMAppState.FINISHED); + handler.handle(appRemoveEvent); + Assert.assertNull(applications.get(appId)); + return app; + } } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Thu Jan 2 20:21:03 2014 @@ -304,7 +304,7 @@ public class TestApplicationLimits { int APPLICATION_ID = 0; // Submit first application FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_0, user_0, A); + queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); @@ -312,7 +312,7 @@ public class TestApplicationLimits { // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_1, user_0, A); + queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -320,14 +320,14 @@ public class TestApplicationLimits { // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_2, user_0, A); + queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); // Finish one application, app_2 should be activated - queue.finishApplication(app_0, A); + queue.finishApplicationAttempt(app_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -335,7 +335,7 @@ public class TestApplicationLimits { // Submit another one for user_0 FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_3, user_0, A); + queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -346,7 +346,7 @@ public class TestApplicationLimits { // Submit first app for user_1 FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); - queue.submitApplication(app_4, user_1, A); + queue.submitApplicationAttempt(app_4, user_1); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -356,7 +356,7 @@ public class TestApplicationLimits { // Submit second app for user_1, should block due to queue-limit FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); - queue.submitApplication(app_5, user_1, A); + queue.submitApplicationAttempt(app_5, user_1); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -365,7 +365,7 @@ public class TestApplicationLimits { assertEquals(1, queue.getNumPendingApplications(user_1)); // Now finish one app of user_1 so app_5 should be activated - queue.finishApplication(app_4, A); + queue.finishApplicationAttempt(app_4, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -385,7 +385,7 @@ public class TestApplicationLimits { // Submit first application FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_0, user_0, A); + queue.submitApplicationAttempt(app_0, user_0); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); @@ -394,7 +394,7 @@ public class TestApplicationLimits { // Submit second application FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_1, user_0, A); + queue.submitApplicationAttempt(app_1, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -403,7 +403,7 @@ public class TestApplicationLimits { // Submit third application, should remain pending FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_2, user_0, A); + queue.submitApplicationAttempt(app_2, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -412,7 +412,7 @@ public class TestApplicationLimits { // Submit fourth application, should remain pending FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); - queue.submitApplication(app_3, user_0, A); + queue.submitApplicationAttempt(app_3, user_0); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -420,7 +420,7 @@ public class TestApplicationLimits { assertTrue(queue.pendingApplications.contains(app_3)); // Kill 3rd pending application - queue.finishApplication(app_2, A); + queue.finishApplicationAttempt(app_2, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -429,7 +429,7 @@ public class TestApplicationLimits { assertFalse(queue.activeApplications.contains(app_2)); // Finish 1st application, app_3 should become active - queue.finishApplication(app_0, A); + queue.finishApplicationAttempt(app_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); @@ -439,7 +439,7 @@ public class TestApplicationLimits { assertFalse(queue.activeApplications.contains(app_0)); // Finish 2nd application - queue.finishApplication(app_1, A); + queue.finishApplicationAttempt(app_1, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(1, queue.getNumActiveApplications(user_0)); @@ -447,7 +447,7 @@ public class TestApplicationLimits { assertFalse(queue.activeApplications.contains(app_1)); // Finish 4th application - queue.finishApplication(app_3, A); + queue.finishApplicationAttempt(app_3, A); assertEquals(0, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); assertEquals(0, queue.getNumActiveApplications(user_0)); @@ -507,7 +507,7 @@ public class TestApplicationLimits { FiCaSchedulerApp app_0_0 = spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, queue.getActiveUsersManager(), rmContext)); - queue.submitApplication(app_0_0, user_0, A); + queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); app_0_0_requests.add( @@ -526,7 +526,7 @@ public class TestApplicationLimits { FiCaSchedulerApp app_0_1 = spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, queue.getActiveUsersManager(), rmContext)); - queue.submitApplication(app_0_1, user_0, A); + queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); app_0_1_requests.add( @@ -545,7 +545,7 @@ public class TestApplicationLimits { FiCaSchedulerApp app_1_0 = spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, queue.getActiveUsersManager(), rmContext)); - queue.submitApplication(app_1_0, user_1, A); + queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); app_1_0_requests.add( Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Thu Jan 2 20:21:03 2014 @@ -64,7 +64,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -555,9 +558,12 @@ public class TestCapacityScheduler { ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); - SchedulerEvent event = - new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user"); - cs.handle(event); + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "default", "user"); + cs.handle(addAppEvent); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId); + cs.handle(addAttemptEvent); // Verify the blacklist can be updated independent of requesting containers cs.allocate(appAttemptId, Collections.emptyList(), @@ -596,10 +602,10 @@ public class TestCapacityScheduler { public void testConcurrentAccessOnApplications() throws Exception { CapacityScheduler cs = new CapacityScheduler(); verifyConcurrentAccessOnApplications( - cs.applications, FiCaSchedulerApp.class, Queue.class); + cs.appAttempts, FiCaSchedulerApp.class, Queue.class); } - public static + public static void verifyConcurrentAccessOnApplications( final Map applications, Class appClazz, final Class queueClazz) @@ -682,4 +688,21 @@ public class TestCapacityScheduler { Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue")); } -} + @Test + public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { + + AsyncDispatcher rmDispatcher = new AsyncDispatcher(); + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, + null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); + + SchedulerApplication app = + TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( + cs.applications, cs, "a1"); + Assert.assertEquals("a1", app.getQueue().getQueueName()); + } + } Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1554898&r1=1554897&r2=1554898&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Thu Jan 2 20:21:03 2014 @@ -271,14 +271,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_0, user_0, B); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_1, user_0, B); // same user + a.submitApplicationAttempt(app_1, user_0); // same user // Setup some nodes @@ -320,14 +320,14 @@ public class TestLeafQueue { .getMockApplicationAttemptId(0, 1); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null, rmContext); - d.submitApplication(app_0, user_d, D); + d.submitApplicationAttempt(app_0, user_d); // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null, rmContext); - d.submitApplication(app_1, user_d, D); // same user + d.submitApplicationAttempt(app_1, user_d); // same user } @@ -345,7 +345,7 @@ public class TestLeafQueue { .getMockApplicationAttemptId(0, 1); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, rmContext); - a.submitApplication(app_0, user_0, B); + a.submitApplicationAttempt(app_0, user_0); when(cs.getApplication(appAttemptId_0)).thenReturn(app_0); AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent( @@ -360,7 +360,7 @@ public class TestLeafQueue { .getMockApplicationAttemptId(0, 2); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, rmContext); - a.submitApplication(app_1, user_0, B); // same user + a.submitApplicationAttempt(app_1, user_0); // same user assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsPending()); @@ -396,14 +396,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_1, user_0, A); // same user + a.submitApplicationAttempt(app_1, user_0); // same user // Setup some nodes @@ -524,21 +524,21 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_1, user_0, A); // same user + a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_2, user_1, A); + a.submitApplicationAttempt(app_2, user_1); // Setup some nodes String host_0 = "127.0.0.1"; @@ -618,21 +618,21 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_1, user_0, A); // same user + a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_2, user_1, A); + a.submitApplicationAttempt(app_2, user_1); // Setup some nodes String host_0 = "127.0.0.1"; @@ -729,28 +729,28 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_1, user_0, A); // same user + a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_2, user_1, A); + a.submitApplicationAttempt(app_2, user_1); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_2, a, a.getActiveUsersManager(), rmContext); - a.submitApplication(app_3, user_2, A); + a.submitApplicationAttempt(app_3, user_2); // Setup some nodes String host_0 = "127.0.0.1"; @@ -905,14 +905,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_1, user_1, A); + a.submitApplicationAttempt(app_1, user_1); // Setup some nodes String host_0 = "127.0.0.1"; @@ -1007,14 +1007,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_1, user_1, A); + a.submitApplicationAttempt(app_1, user_1); // Setup some nodes String host_0 = "127.0.0.1"; @@ -1111,14 +1111,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext); - a.submitApplication(app_1, user_1, A); + a.submitApplicationAttempt(app_1, user_1); // Setup some nodes String host_0 = "127.0.0.1"; @@ -1232,7 +1232,7 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext)); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); // Setup some nodes and racks String host_0 = "127.0.0.1"; @@ -1373,7 +1373,7 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext)); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); // Setup some nodes and racks String host_0 = "127.0.0.1"; @@ -1504,7 +1504,7 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext)); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); // Setup some nodes and racks String host_0_0 = "127.0.0.1"; @@ -1607,21 +1607,21 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_0, user_e, E); + e.submitApplicationAttempt(app_0, user_e); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_1, user_e, E); // same user + e.submitApplicationAttempt(app_1, user_e); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_2, user_e, E); // same user + e.submitApplicationAttempt(app_2, user_e); // same user // before reinitialization assertEquals(2, e.activeApplications.size()); @@ -1685,21 +1685,21 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_0, user_e, E); + e.submitApplicationAttempt(app_0, user_e); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_1, user_e, E); // same user + e.submitApplicationAttempt(app_1, user_e); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_e, e, mock(ActiveUsersManager.class), rmContext); - e.submitApplication(app_2, user_e, E); // same user + e.submitApplicationAttempt(app_2, user_e); // same user // before updating cluster resource assertEquals(2, e.activeApplications.size()); @@ -1762,14 +1762,14 @@ public class TestLeafQueue { FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext)); - a.submitApplication(app_0, user_0, A); + a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext)); - a.submitApplication(app_1, user_0, A); + a.submitApplicationAttempt(app_1, user_0); // Setup some nodes and racks String host_0_0 = "127.0.0.1";