Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7590613C for ; Fri, 24 Jun 2011 21:31:27 +0000 (UTC) Received: (qmail 52593 invoked by uid 500); 24 Jun 2011 21:31:27 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 52550 invoked by uid 500); 24 Jun 2011 21:31:27 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 52542 invoked by uid 99); 24 Jun 2011 21:31:27 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jun 2011 21:31:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jun 2011 21:31:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3CEC223889E0; Fri, 24 Jun 2011 21:30:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1139443 [1/2] - in /hadoop/common/branches/MR-279/mapreduce: ./ .eclipse.templates/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-api/src/m... Date: Fri, 24 Jun 2011 21:30:53 -0000 To: mapreduce-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110624213054.3CEC223889E0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Fri Jun 24 21:30:53 2011 New Revision: 1139443 URL: http://svn.apache.org/viewvc?rev=1139443&view=rev Log: Fix some invalid transitions in the RM. Contributed by Vinod KV Modified: hadoop/common/branches/MR-279/mapreduce/.eclipse.templates/.classpath hadoop/common/branches/MR-279/mapreduce/CHANGES.txt hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterExpiry.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java Modified: hadoop/common/branches/MR-279/mapreduce/.eclipse.templates/.classpath URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/.eclipse.templates/.classpath?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/.eclipse.templates/.classpath (original) +++ hadoop/common/branches/MR-279/mapreduce/.eclipse.templates/.classpath Fri Jun 24 21:30:53 2011 @@ -57,11 +57,12 @@ - + + Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Fri Jun 24 21:30:53 2011 @@ -4,6 +4,8 @@ Trunk (unreleased changes) MAPREDUCE-279 + + Fix some invalid transitions in the RM. (vinodkv via ddas) Fix diagnostics display for more than 100 apps in RM. (llu) Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStatus.java Fri Jun 24 21:30:53 2011 @@ -22,10 +22,8 @@ public interface ApplicationStatus { int getResponseId(); ApplicationId getApplicationId(); float getProgress(); - long getLastSeen(); void setResponseId(int id); void setApplicationId(ApplicationId applicationID); void setProgress(float progress); - void setLastSeen(long lastSeen); } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStatusPBImpl.java Fri Jun 24 21:30:53 2011 @@ -104,17 +104,6 @@ public class ApplicationStatusPBImpl ext maybeInitBuilder(); builder.setProgress((progress)); } - @Override - public long getLastSeen() { - ApplicationStatusProtoOrBuilder p = viaProto ? proto : builder; - return (p.getLastSeen()); - } - - @Override - public void setLastSeen(long lastSeen) { - maybeInitBuilder(); - builder.setLastSeen((lastSeen)); - } private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-api/src/main/proto/yarn_protos.proto Fri Jun 24 21:30:53 2011 @@ -66,7 +66,6 @@ message ApplicationStatusProto { optional int32 response_id = 1; optional ApplicationIdProto application_id = 2; optional float progress = 3; - optional int64 last_seen = 4; } message ApplicationMasterProto { Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java Fri Jun 24 21:30:53 2011 @@ -155,7 +155,6 @@ public class MockApps { public static ApplicationStatus newAppStatus() { ApplicationStatus status = Records.newRecord(ApplicationStatus.class); status.setProgress((float)Math.random()); - status.setLastSeen(System.currentTimeMillis()); return status; } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Fri Jun 24 21:30:53 2011 @@ -117,6 +117,7 @@ AMRMProtocol, EventHandler(eventType, master)); + handler.handle(new ApplicationMasterInfoEvent(eventType, master + .getApplicationID())); break; case CLEANUP: try { @@ -273,7 +273,7 @@ public class AMLauncher implements Runna } catch(IOException ie) { LOG.info("Error cleaning master ", ie); } - handler.handle(new ApplicationFinishEvent(master, + handler.handle(new ApplicationFinishEvent(master.getApplicationID(), ApplicationState.COMPLETED)); // Doesn't matter what state you send :) :( break; default: Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Fri Jun 24 21:30:53 2011 @@ -20,13 +20,10 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -34,26 +31,19 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationMaster; -import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMConfig; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; -import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; -import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.service.CompositeService; /** * This class tracks the application masters that are running. It tracks @@ -62,15 +52,12 @@ import org.apache.hadoop.yarn.service.Ab */ @Evolving @Private -public class AMTracker extends AbstractService implements EventHandler>, Recoverable { +public class AMTracker extends CompositeService implements Recoverable { private static final Log LOG = LogFactory.getLog(AMTracker.class); private AMLivelinessMonitor amLivelinessMonitor; - private long amExpiryInterval; @SuppressWarnings("rawtypes") private EventHandler handler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private int amMaxRetries; private final RMContext rmContext; @@ -78,141 +65,43 @@ public class AMTracker extends AbstractS new ConcurrentHashMap(); private final ApplicationsStore appsStore; - - private TreeSet amExpiryQueue = - new TreeSet( - new Comparator() { - public int compare(ApplicationStatus p1, ApplicationStatus p2) { - if (p1.getLastSeen() < p2.getLastSeen()) { - return -1; - } else if (p1.getLastSeen() > p2.getLastSeen()) { - return 1; - } else { - return (p1.getApplicationId().getId() - - p2.getApplicationId().getId()); - } - } - } - ); public AMTracker(RMContext rmContext) { super(AMTracker.class.getName()); - this.amLivelinessMonitor = new AMLivelinessMonitor(); this.rmContext = rmContext; this.appsStore = rmContext.getApplicationsStore(); + this.handler = rmContext.getDispatcher().getEventHandler(); + this.amLivelinessMonitor = new AMLivelinessMonitor(this.handler); + addService(this.amLivelinessMonitor); } @Override public void init(Configuration conf) { + this.rmContext.getDispatcher().register(ApplicationEventType.class, + new ApplicationEventDispatcher()); super.init(conf); - this.handler = rmContext.getDispatcher().getEventHandler(); - this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, - RMConfig.DEFAULT_AM_EXPIRY_INTERVAL); - LOG.info("AM expiry interval: " + this.amExpiryInterval); - this.amMaxRetries = conf.getInt(RMConfig.AM_MAX_RETRIES, - RMConfig.DEFAULT_AM_MAX_RETRIES); - LOG.info("AM max retries: " + this.amMaxRetries); - this.amLivelinessMonitor.setMonitoringInterval(conf.getLong( - RMConfig.AMLIVELINESS_MONITORING_INTERVAL, - RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL)); - this.rmContext.getDispatcher().register(ApplicationEventType.class, this); } - @Override - public void start() { - super.start(); - amLivelinessMonitor.start(); - } - - /** - * This class runs continuosly to track the application masters - * that might be dead. - */ - private class AMLivelinessMonitor extends Thread { - private volatile boolean stop = false; - private long monitoringInterval = - RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL; - - public AMLivelinessMonitor() { - super("ApplicationsManager:" + AMLivelinessMonitor.class.getName()); - } + private final class ApplicationEventDispatcher implements + EventHandler { - public void setMonitoringInterval(long interval) { - this.monitoringInterval = interval; + public ApplicationEventDispatcher() { } @Override - public void run() { - - /* the expiry queue does not need to be in sync with applications, - * if an applications in the expiry queue cannot be found in applications - * its alright. We do not want to hold a lock on applications while going - * through the expiry queue. - */ - List expired = new ArrayList(); - while (!stop) { - ApplicationStatus leastRecent; - long now = System.currentTimeMillis(); - expired.clear(); - synchronized(amExpiryQueue) { - while ((amExpiryQueue.size() > 0) && - (leastRecent = amExpiryQueue.first()) != null && - ((now - leastRecent.getLastSeen()) > - amExpiryInterval)) { - amExpiryQueue.remove(leastRecent); - ApplicationMasterInfo info; - synchronized(applications) { - info = applications.get(leastRecent.getApplicationId()); - } - if (info == null) { - continue; - } - ApplicationStatus status = info.getStatus(); - if ((now - status.getLastSeen()) > amExpiryInterval) { - expired.add(status.getApplicationId()); - } else { - amExpiryQueue.add(status); - } - } - } - expireAMs(expired); - try { - Thread.sleep(this.monitoringInterval); - } catch (InterruptedException e) { - LOG.warn(this.getClass().getName() + " interrupted. Returning."); - return; - } - } - } - - public void shutdown() { - stop = true; - } - } - - private void expireAMs(List toExpire) { - for (ApplicationId app: toExpire) { - ApplicationMasterInfo am = null; + public void handle(ApplicationMasterInfoEvent event) { + ApplicationId appID = event.getApplicationId(); + ApplicationMasterInfo masterInfo = null; synchronized (applications) { - am = applications.get(app); + masterInfo = applications.get(appID); + } + try { + masterInfo.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for application " + event.getApplicationId()); } - LOG.info("Expiring the Application " + app); - handler.handle(new ASMEvent - (ApplicationEventType.EXPIRE, am)); - } - } - - @Override - public void stop() { - amLivelinessMonitor.interrupt(); - amLivelinessMonitor.shutdown(); - try { - amLivelinessMonitor.join(); - } catch (InterruptedException ie) { - LOG.info(amLivelinessMonitor.getName() + " interrupted during join ", - ie); } - super.stop(); } public void addMaster(String user, ApplicationSubmissionContext @@ -220,8 +109,9 @@ public class AMTracker extends AbstractS ApplicationStore appStore = appsStore.createApplicationStore(submissionContext.getApplicationId(), submissionContext); - ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(rmContext, - user, submissionContext, clientToken, appStore); + ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo( + rmContext, getConfig(), user, submissionContext, clientToken, + appStore, this.amLivelinessMonitor); synchronized(applications) { applications.put(applicationMaster.getApplicationID(), applicationMaster); } @@ -229,22 +119,15 @@ public class AMTracker extends AbstractS } public void runApplication(ApplicationId applicationId) { - ApplicationMasterInfo masterInfo = null; - synchronized (applications) { - masterInfo = applications.get(applicationId); - } - rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent( - ApplicationEventType.ALLOCATE, masterInfo)); - + rmContext.getDispatcher().getSyncHandler().handle( + new ApplicationMasterInfoEvent(ApplicationEventType.ALLOCATE, + applicationId)); } public void finishNonRunnableApplication(ApplicationId applicationId) { - ApplicationMasterInfo masterInfo = null; - synchronized (applications) { - masterInfo = applications.get(applicationId); - } - rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent( - ApplicationEventType.FAILED, masterInfo)); + rmContext.getDispatcher().getSyncHandler().handle( + new ApplicationMasterInfoEvent(ApplicationEventType.FAILED, + applicationId)); } public void finish(ApplicationMaster remoteApplicationMaster) { @@ -263,7 +146,7 @@ public class AMTracker extends AbstractS remoteApplicationMaster.getDiagnostics()); rmContext.getDispatcher().getEventHandler().handle( - new ApplicationFinishEvent(masterInfo, remoteApplicationMaster + new ApplicationFinishEvent(applicationId, remoteApplicationMaster .getState())); } @@ -280,6 +163,7 @@ public class AMTracker extends AbstractS public void remove(ApplicationId applicationId) { synchronized (applications) { //applications.remove(applicationId); + this.amLivelinessMonitor.unRegister(applicationId); } } @@ -291,155 +175,27 @@ public class AMTracker extends AbstractS } } return allAMs; - } - - private void addForTracking(AppContext master) { - LOG.info("Adding application master for tracking " + master.getMaster()); - synchronized (amExpiryQueue) { - amExpiryQueue.add(master.getStatus()); - } } public void kill(ApplicationId applicationID) { - ApplicationMasterInfo masterInfo = null; - - synchronized(applications) { - masterInfo = applications.get(applicationID); - } - handler.handle(new ASMEvent(ApplicationEventType.KILL, - masterInfo)); - } - - /* - * this class is used for passing status context to the application state - * machine. - */ - private static class TrackerAppContext implements AppContext { - private final ApplicationId appID; - private final ApplicationMaster master; - private final UnsupportedOperationException notimplemented; - - public TrackerAppContext( - ApplicationId appId, ApplicationMaster master) { - this.appID = appId; - this.master = master; - this.notimplemented = new NotImplementedException(); - } - - @Override - public ApplicationSubmissionContext getSubmissionContext() { - throw notimplemented; - } - @Override - public Resource getResource() { - throw notimplemented; - } - @Override - public ApplicationId getApplicationID() { - return appID; - } - @Override - public ApplicationStatus getStatus() { - return master.getStatus(); - } - @Override - public ApplicationMaster getMaster() { - return master; - } - @Override - public Container getMasterContainer() { - throw notimplemented; - } - @Override - public String getUser() { - throw notimplemented; - } - - @Override - public String getName() { - throw notimplemented; - } - @Override - public String getQueue() { - throw notimplemented; - } - - @Override - public int getFailedCount() { - throw notimplemented; - } - - @Override - public ApplicationStore getStore() { - throw notimplemented; - } - - @Override - public long getStartTime() { - throw notimplemented; - } - - @Override - public long getFinishTime() { - throw notimplemented; - } + handler.handle(new ApplicationMasterInfoEvent(ApplicationEventType.KILL, + applicationID)); } public void heartBeat(ApplicationStatus status) { ApplicationMaster master = recordFactory.newRecordInstance(ApplicationMaster.class); master.setStatus(status); master.setApplicationId(status.getApplicationId()); - TrackerAppContext context = new TrackerAppContext(status.getApplicationId(), master); - handler.handle(new ASMEvent(ApplicationEventType.STATUSUPDATE, - context)); + handler.handle(new ApplicationMasterStatusUpdateEvent(status)); } public void registerMaster(ApplicationMaster applicationMaster) { - applicationMaster.getStatus().setLastSeen(System.currentTimeMillis()); ApplicationMasterInfo master = null; synchronized(applications) { master = applications.get(applicationMaster.getApplicationId()); } LOG.info("AM registration " + master.getMaster()); - TrackerAppContext registrationContext = new TrackerAppContext( - master.getApplicationID(), applicationMaster); - handler.handle(new ASMEvent(ApplicationEventType. - REGISTERED, registrationContext)); - } - - @Override - public void handle(ASMEvent event) { - ApplicationId appID = event.getAppContext().getApplicationID(); - ApplicationMasterInfo masterInfo = null; - synchronized(applications) { - masterInfo = applications.get(appID); - } - try { - masterInfo.handle(event); - } catch(Throwable t) { - LOG.error("Error in handling event type " + event.getType() + " for application " - + event.getAppContext().getApplicationID()); - } - /* we need to launch the applicaiton master on allocated transition */ - if (masterInfo.getState() == ApplicationState.ALLOCATED) { - handler.handle(new ASMEvent( - ApplicationEventType.LAUNCH, masterInfo)); - } - if (masterInfo.getState() == ApplicationState.LAUNCHED) { - addForTracking(masterInfo); - } - - /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */ - if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) { - /* check to see if the number of retries are reached or not */ - if (masterInfo.getFailedCount() < this.amMaxRetries) { - handler.handle(new ASMEvent(ApplicationEventType.ALLOCATE, - masterInfo)); - } else { - handler.handle(new ASMEvent(ApplicationEventType. - FAILED_MAX_RETRIES, masterInfo)); - } - } + handler.handle(new ApplicationMasterRegistrationEvent(applicationMaster)); } @Override @@ -449,11 +205,13 @@ public class AMTracker extends AbstractS ApplicationInfo appInfo = entry.getValue(); ApplicationMasterInfo masterInfo = null; try { - masterInfo = new ApplicationMasterInfo(this.rmContext, - - appInfo.getApplicationSubmissionContext().getUser(), appInfo.getApplicationSubmissionContext(), - appInfo.getApplicationMaster().getClientToken(), - this.appsStore.createApplicationStore(appId, appInfo.getApplicationSubmissionContext())); + masterInfo = new ApplicationMasterInfo(this.rmContext, getConfig(), + appInfo.getApplicationSubmissionContext().getUser(), appInfo + .getApplicationSubmissionContext(), appInfo + .getApplicationMaster().getClientToken(), this.appsStore + .createApplicationStore(appId, appInfo + .getApplicationSubmissionContext()), + this.amLivelinessMonitor); } catch(IOException ie) { //ignore } @@ -470,7 +228,8 @@ public class AMTracker extends AbstractS master.setStatus(storedAppMaster.getStatus()); master.setState(storedAppMaster.getState()); applications.put(appId, masterInfo); - handler.handle(new ASMEvent(ApplicationEventType.RECOVER, masterInfo)); + handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.RECOVER, appId)); } } } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java Fri Jun 24 21:30:53 2011 @@ -22,11 +22,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationState; @@ -37,9 +41,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMConfig; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; -import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; @@ -59,7 +63,8 @@ import org.apache.hadoop.yarn.state.Stat */ @Private @Unstable -public class ApplicationMasterInfo implements AppContext, EventHandler> { +public class ApplicationMasterInfo implements AppContext, + EventHandler { private static final Log LOG = LogFactory.getLog(ApplicationMasterInfo.class); private final ApplicationSubmissionContext submissionContext; private ApplicationMaster master; @@ -74,7 +79,11 @@ public class ApplicationMasterInfo imple private static String DIAGNOSTIC_KILL_APPLICATION = "Application was killed."; private static String DIAGNOSTIC_AM_FAILED = "Application Master failed"; private static String DIAGNOSTIC_AM_LAUNCH_FAILED = "Application Master failed to launch"; - + + private final int amMaxRetries; + private final AMLivelinessMonitor amLivelinessMonitor; + private final ReadLock readLock; + private final WriteLock writeLock; private int numFailed = 0; private final ApplicationStore appStore; @@ -93,15 +102,13 @@ public class ApplicationMasterInfo imple private final LaunchedTransition launchedTransition = new LaunchedTransition(); private final FailedLaunchTransition failedLaunchTransition = new FailedLaunchTransition(); - private final StateMachine> stateMachine; + private final StateMachine stateMachine; - private final StateMachineFactory> stateMachineFactory - - = new StateMachineFactory - > - (ApplicationState.PENDING) + private final StateMachineFactory stateMachineFactory + = new StateMachineFactory(ApplicationState.PENDING) // Transitions from PENDING State .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING, @@ -147,8 +154,6 @@ public class ApplicationMasterInfo imple // Transitions from LAUNCHED State .addTransition(ApplicationState.LAUNCHED, ApplicationState.CLEANUP, ApplicationEventType.KILL, killTransition) - .addTransition(ApplicationState.LAUNCHED, ApplicationState.FAILED, - ApplicationEventType.EXPIRE, expireTransition) .addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING, ApplicationEventType.REGISTERED, new RegisterTransition()) .addTransition(ApplicationState.LAUNCHED, ApplicationState.LAUNCHED, @@ -189,8 +194,7 @@ public class ApplicationMasterInfo imple // Transitions from COMPLETED State .addTransition(ApplicationState.COMPLETED, ApplicationState.COMPLETED, - EnumSet.of(ApplicationEventType.EXPIRE, - ApplicationEventType.FINISH, ApplicationEventType.KILL, + EnumSet.of(ApplicationEventType.FINISH, ApplicationEventType.KILL, ApplicationEventType.RECOVER)) // Transitions from FAILED State @@ -209,8 +213,10 @@ public class ApplicationMasterInfo imple - public ApplicationMasterInfo(RMContext context, String user, - ApplicationSubmissionContext submissionContext, String clientToken, ApplicationStore appStore) { + public ApplicationMasterInfo(RMContext context, Configuration conf, + String user, ApplicationSubmissionContext submissionContext, + String clientToken, ApplicationStore appStore, + AMLivelinessMonitor amLivelinessMonitor) { this.user = user; this.handler = context.getDispatcher().getEventHandler(); this.syncHandler = context.getDispatcher().getSyncHandler(); @@ -228,6 +234,14 @@ public class ApplicationMasterInfo imple master.setDiagnostics(""); this.appStore = appStore; this.startTime = System.currentTimeMillis(); + this.amMaxRetries = conf.getInt(RMConfig.AM_MAX_RETRIES, + RMConfig.DEFAULT_AM_MAX_RETRIES); + LOG.info("AM max retries: " + this.amMaxRetries); + this.amLivelinessMonitor = amLivelinessMonitor; + + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); } @Override @@ -307,12 +321,13 @@ public class ApplicationMasterInfo imple } /* the applicaiton master completed successfully */ - private static class DoneTransition implements - MultipleArcTransition, ApplicationState> { + private static class DoneTransition + implements + MultipleArcTransition { @Override public ApplicationState transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.handler.handle(new ASMEvent( SNEventType.CLEANUP, masterInfo)); masterInfo.handler.handle(new ASMEvent( @@ -320,26 +335,29 @@ public class ApplicationMasterInfo imple masterInfo.handler.handle(new ASMEvent( ApplicationTrackerEventType.REMOVE, masterInfo)); masterInfo.finishTime = System.currentTimeMillis(); + + masterInfo.amLivelinessMonitor.unRegister(event.getApplicationId()); + ApplicationFinishEvent finishEvent = (ApplicationFinishEvent) event; return finishEvent.getFinalApplicationState(); } } - private static class AllocatingKillTransition implements - SingleArcTransition> { + private static class AllocatingKillTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.handler.handle(new ASMEvent(ApplicationTrackerEventType.REMOVE, masterInfo)); } } private static class KillTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.finishTime = System.currentTimeMillis(); masterInfo.getMaster().setDiagnostics(DIAGNOSTIC_KILL_APPLICATION); masterInfo.handler.handle(new ASMEvent(SNEventType.CLEANUP, masterInfo)); @@ -349,12 +367,12 @@ public class ApplicationMasterInfo imple } } - private static class RecoverLaunchTransition implements SingleArcTransition - > { + private static class RecoverLaunchTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.syncHandler.handle(new ASMEvent( ApplicationTrackerEventType.ADD, masterInfo)); @@ -363,11 +381,11 @@ public class ApplicationMasterInfo imple } } - private static class FailedLaunchTransition implements - SingleArcTransition> { + private static class FailedLaunchTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.finishTime = System.currentTimeMillis(); masterInfo.getMaster().setDiagnostics(DIAGNOSTIC_AM_LAUNCH_FAILED); masterInfo.handler.handle(new ASMEvent( @@ -376,74 +394,82 @@ public class ApplicationMasterInfo imple } private static class LaunchTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.handler.handle(new ASMEvent( AMLauncherEventType.LAUNCH, masterInfo)); } } private static class RecoverRunningTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.syncHandler.handle(new ASMEvent( ApplicationTrackerEventType.ADD, masterInfo)); /* make sure the time stamp is update else expiry thread will expire this */ - masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis()); + masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId()); } } private static class RecoverLaunchedTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.syncHandler.handle(new ASMEvent( ApplicationTrackerEventType.ADD, masterInfo)); - /* make sure the time stamp is update else expiry thread will expire this */ - masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis()); + masterInfo.amLivelinessMonitor.register(event.getApplicationId()); } } private static class LaunchedTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { - /* make sure the time stamp is update else expiry thread will expire this */ - masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis()); + ApplicationMasterInfoEvent event) { + masterInfo.amLivelinessMonitor.register(event.getApplicationId()); } } - private static class ExpireTransition implements - SingleArcTransition> { + private static class ExpireTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { /* for now this is the same as killed transition but will change later */ masterInfo.handler.handle(new ASMEvent(SNEventType.CLEANUP, masterInfo)); masterInfo.handler.handle(new ASMEvent( AMLauncherEventType.CLEANUP, masterInfo)); masterInfo.handler.handle(new ASMEvent( - ApplicationTrackerEventType.EXPIRE, masterInfo)); + ApplicationTrackerEventType.EXPIRE, masterInfo)); masterInfo.numFailed++; + + /* check to see if the number of retries are reached or not */ + if (masterInfo.getFailedCount() < masterInfo.amMaxRetries) { + masterInfo.handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.ALLOCATE, event.getApplicationId())); + } else { + masterInfo.handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.FAILED_MAX_RETRIES, masterInfo + .getApplicationID())); + } } } /* Transition to schedule again on a container launch failure for AM */ private static class ScheduleTransition implements - SingleArcTransition> { + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { masterInfo.masterContainer = null; /* schedule for a slot */ masterInfo.handler.handle(new ASMEvent(SNEventType.SCHEDULE, @@ -452,50 +478,61 @@ public class ApplicationMasterInfo imple } /* Transition to start the process of allocating for the AM container */ - private static class AllocateTransition implements - SingleArcTransition> { + private static class AllocateTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { /* notify tracking applications that an applicaiton has been added */ - masterInfo.handler.handle(new ASMEvent( + // TODO: For now, changing to synchHandler. Instead we should use register/deregister. + masterInfo.syncHandler.handle(new ASMEvent( ApplicationTrackerEventType.ADD, masterInfo)); /* schedule for a slot */ - masterInfo.handler.handle(new ASMEvent(SNEventType.SCHEDULE, - masterInfo)); + masterInfo.handler.handle(new ASMEvent( + SNEventType.SCHEDULE, masterInfo)); } } /* Transition on a container allocated for a container */ - private static class AllocatedTransition implements SingleArcTransition> { + private static class AllocatedTransition + implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { /* set the container that was generated by the scheduler negotiator */ - masterInfo.masterContainer = event.getAppContext().getMasterContainer(); + ApplicationMasterAllocatedEvent allocatedEvent = + (ApplicationMasterAllocatedEvent) event; + masterInfo.masterContainer = allocatedEvent.getMasterContainer(); try { masterInfo.appStore.storeMasterContainer(masterInfo.masterContainer); } catch(IOException ie) { //TODO ignore for now fix later. } + + /* we need to launch the applicaiton master on allocated transition */ + masterInfo.handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.LAUNCH, masterInfo.getApplicationID())); } } - private static class RegisterTransition implements SingleArcTransition> { + private static class RegisterTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { - ApplicationMaster registeredMaster = event.getAppContext().getMaster(); + ApplicationMasterInfoEvent event) { + ApplicationMasterRegistrationEvent registrationEvent = + (ApplicationMasterRegistrationEvent) event; + ApplicationMaster registeredMaster = registrationEvent + .getApplicationMaster(); masterInfo.master.setHost(registeredMaster.getHost()); masterInfo.master.setTrackingUrl(registeredMaster.getTrackingUrl()); masterInfo.master.setRpcPort(registeredMaster.getRpcPort()); masterInfo.master.setStatus(registeredMaster.getStatus()); masterInfo.master.getStatus().setProgress(0.0f); - masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis()); + masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId()); try { masterInfo.appStore.updateApplicationState(masterInfo.master); } catch(IOException ie) { @@ -506,52 +543,61 @@ public class ApplicationMasterInfo imple /* transition to finishing state on a cleanup, for now its not used, but will need it * later */ - private static class FailedTransition implements - SingleArcTransition> { + private static class FailedTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { + ApplicationMasterInfoEvent event) { LOG.info("Failed application: " + masterInfo.getApplicationID()); } } /* Just a status update transition */ - private static class StatusUpdateTransition implements - SingleArcTransition> { + private static class StatusUpdateTransition implements + SingleArcTransition { @Override public void transition(ApplicationMasterInfo masterInfo, - ASMEvent event) { - masterInfo.master.setStatus(event.getAppContext().getStatus()); - masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis()); + ApplicationMasterInfoEvent event) { + ApplicationMasterStatusUpdateEvent statusUpdateEvent = + (ApplicationMasterStatusUpdateEvent) event; + masterInfo.master.setStatus(statusUpdateEvent.getApplicationStatus()); + masterInfo.amLivelinessMonitor.receivedPing(event.getApplicationId()); } } @Override - public synchronized void handle(ASMEvent event) { - ApplicationId appID = event.getAppContext().getApplicationID(); - LOG.info("Processing event for " + appID + " of type " + event.getType()); - final ApplicationState oldState = getState(); - try { - /* keep the master in sync with the state machine */ - stateMachine.doTransition(event.getType(), event); - master.setState(stateMachine.getCurrentState()); - LOG.info("State is " + stateMachine.getCurrentState()); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); - /* TODO fail the application on the failed transition */ - } + public synchronized void handle(ApplicationMasterInfoEvent event) { + + this.writeLock.lock(); + try { - appStore.updateApplicationState(master); - } catch(IOException ie) { - //TODO ignore for now - } - if (oldState != getState()) { - LOG.info(appID + " State change from " - + oldState + " to " - + getState()); + ApplicationId appID = event.getApplicationId(); + LOG.info("Processing event for " + appID + " of type " + + event.getType()); + final ApplicationState oldState = getState(); + try { + /* keep the master in sync with the state machine */ + stateMachine.doTransition(event.getType(), event); + master.setState(stateMachine.getCurrentState()); + LOG.info("State is " + stateMachine.getCurrentState()); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); + /* TODO fail the application on the failed transition */ + } + try { + appStore.updateApplicationState(master); + } catch (IOException ie) { + // TODO ignore for now + } + if (oldState != getState()) { + LOG.info(appID + " State change from " + oldState + " to " + + getState()); + } + } finally { + this.writeLock.unlock(); } } } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Fri Jun 24 21:30:53 2011 @@ -166,12 +166,10 @@ class SchedulerNegotiator extends Abstra it.remove(); Container container = containers.get(0); - LOG.info("Found container " + container + " for AM of " + - masterInfo.getMaster()); - SNAppContext snAppContext = new SNAppContext(masterInfo.getApplicationID(), - container); - handler.handle(new ASMEvent( - ApplicationEventType.ALLOCATED, snAppContext)); + LOG.info("Found container " + container + " for AM of " + + masterInfo.getMaster()); + handler.handle(new ApplicationMasterAllocatedEvent(masterInfo + .getApplicationID(), container)); } } @@ -223,8 +221,8 @@ class SchedulerNegotiator extends Abstra //TODO remove IOException from the scheduler. LOG.error("Error while releasing container for AM " + appContext.getApplicationID()); } - handler.handle(new ASMEvent(ApplicationEventType.RELEASED, - appContext)); + handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.RELEASED, appContext.getApplicationID())); break; case CLEANUP: try { @@ -247,82 +245,5 @@ class SchedulerNegotiator extends Abstra Container[] containers = new Container[] {masterInfo.getMasterContainer()}; scheduler.allocate(masterInfo.getMaster().getApplicationId(), EMPTY_ASK, Arrays.asList(containers)); - } - - private static class SNAppContext implements AppContext { - private final ApplicationId appID; - private final Container container; - private final UnsupportedOperationException notImplementedException; - - public SNAppContext(ApplicationId appID, Container container) { - this.appID = appID; - this.container = container; - this.notImplementedException = new UnsupportedOperationException("Not Implemented"); - } - - @Override - public ApplicationSubmissionContext getSubmissionContext() { - throw notImplementedException; - } - - @Override - public Resource getResource() { - throw notImplementedException; - } - - @Override - public ApplicationId getApplicationID() { - return appID; - } - - @Override - public ApplicationStatus getStatus() { - throw notImplementedException; - } - - @Override - public ApplicationMaster getMaster() { - throw notImplementedException; - } - - @Override - public Container getMasterContainer() { - return container; - } - - @Override - public String getUser() { - throw notImplementedException; - } - - @Override - public String getName() { - throw notImplementedException; - } - - @Override - public String getQueue() { - throw notImplementedException; - } - - @Override - public int getFailedCount() { - throw notImplementedException; - } - - @Override - public ApplicationStore getStore() { - throw notImplementedException; - } - - @Override - public long getStartTime() { - throw notImplementedException; - } - - @Override - public long getFinishTime() { - throw notImplementedException; - } - } + } } \ No newline at end of file Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Fri Jun 24 21:30:53 2011 @@ -149,6 +149,7 @@ public class Application { * Clear any pending requests from this application. */ public synchronized void clearRequests() { + priorities.clear(); requests.clear(); LOG.info("Application " + applicationId + " requests cleared"); } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java Fri Jun 24 21:30:53 2011 @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.TestCase; +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,7 +60,7 @@ import org.junit.Before; import org.junit.Test; /* a test case that tests the launch failure of a AM */ -public class TestAMLaunchFailure extends TestCase { +public class TestAMLaunchFailure { private static final Log LOG = LogFactory.getLog(TestAMLaunchFailure.class); private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ApplicationsManagerImpl asmImpl; @@ -162,8 +162,8 @@ public class TestAMLaunchFailure extends e.printStackTrace(); } context.getDispatcher().getEventHandler().handle( - new ASMEvent(ApplicationEventType.LAUNCHED, - app)); + new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED, + app.getApplicationID())); } } } @@ -216,9 +216,11 @@ public class TestAMLaunchFailure extends ApplicationMaster master = asmImpl.getApplicationMaster(appID); while (master.getState() != ApplicationState.FAILED) { + LOG.info("Waiting for application to go to FAILED state." + + " Current state is " + master.getState()); Thread.sleep(200); master = asmImpl.getApplicationMaster(appID); } - assertTrue(master.getState() == ApplicationState.FAILED); + Assert.assertEquals(ApplicationState.FAILED, master.getState()); } } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java Fri Jun 24 21:30:53 2011 @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import junit.framework.TestCase; +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -50,8 +50,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.junit.After; import org.junit.Before; +import org.junit.Test; -public class TestAMRMRPCResponseId extends TestCase { +public class TestAMRMRPCResponseId { private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ApplicationMasterService amService = null; ApplicationTokenSecretManager appTokenManager = new ApplicationTokenSecretManager(); @@ -145,7 +146,8 @@ public class TestAMRMRPCResponseId exten public void tearDown() { } - + + @Test public void testARRMResponseId() throws Exception { ApplicationId applicationID = applicationsManager.getNewApplicationID(); ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); @@ -163,21 +165,21 @@ public class TestAMRMRPCResponseId exten AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class); allocateRequest.setApplicationStatus(status); AMResponse response = amService.allocate(allocateRequest).getAMResponse(); - assertTrue(response.getResponseId() == 1); - assertFalse(response.getReboot()); + Assert.assertEquals(1, response.getResponseId()); + Assert.assertFalse(response.getReboot()); status.setResponseId(response.getResponseId()); allocateRequest.setApplicationStatus(status); response = amService.allocate(allocateRequest).getAMResponse(); - assertTrue(response.getResponseId() == 2); + Assert.assertEquals(2, response.getResponseId()); /* try resending */ response = amService.allocate(allocateRequest).getAMResponse(); - assertTrue(response.getResponseId() == 2); + Assert.assertEquals(2, response.getResponseId()); /** try sending old **/ status.setResponseId(0); allocateRequest.setApplicationStatus(status); response = amService.allocate(allocateRequest).getAMResponse(); - assertTrue(response.getReboot()); + Assert.assertTrue(response.getReboot()); } } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java Fri Jun 24 21:30:53 2011 @@ -7,7 +7,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.TestCase; +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -15,7 +15,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationState; -import org.apache.hadoop.yarn.api.records.ApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -39,7 +38,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker; @@ -56,7 +54,7 @@ import org.junit.Test; * Test to restart the AM on failure. * */ -public class TestAMRestart extends TestCase { +public class TestAMRestart { private static final Log LOG = LogFactory.getLog(TestAMRestart.class); ApplicationsManagerImpl appImpl; RMContext asmContext = new ResourceManager.RMContextImpl(new MemStore()); @@ -105,9 +103,9 @@ public class TestAMRestart extends TestC } catch (InterruptedException e) { } } - asmContext.getDispatcher().getEventHandler().handle(new - ASMEvent(ApplicationEventType.LAUNCHED, - new TestAppContext(appID))); + asmContext.getDispatcher().getEventHandler().handle( + new ApplicationMasterInfoEvent( + ApplicationEventType.LAUNCHED, appID)); launchNotify.addAndGet(-1); } } @@ -255,78 +253,7 @@ public class TestAMRestart extends TestC Thread.sleep(500); count++; } - assertTrue(masterInfo.getState() == finalState); - } - - private class TestAppContext implements AppContext { - private ApplicationId appID; - - public TestAppContext(ApplicationId appID) { - this.appID = appID; - } - @Override - public ApplicationSubmissionContext getSubmissionContext() { - return null; - } - - @Override - public Resource getResource() { - return null; - } - - @Override - public ApplicationId getApplicationID() { - return appID; - } - - @Override - public ApplicationStatus getStatus() { - return null; - } - - @Override - public ApplicationMaster getMaster() { - return null; - } - - @Override - public Container getMasterContainer() { - return null; - } - - @Override - public String getUser() { - return null; - } - - @Override - public String getName() { - return null; - } - - @Override - public String getQueue() { - return null; - } - - @Override - public int getFailedCount() { - return 0; - } - - @Override - public ApplicationStore getStore() { - return StoreFactory.createVoidAppStore(); - } - @Override - public long getStartTime() { - return 0; - } - @Override - public long getFinishTime() { - return 0; - } - + Assert.assertEquals(finalState, masterInfo.getState()); } @Test @@ -345,11 +272,11 @@ public class TestAMRestart extends TestC schedulerNotify.wait(); } } - assertTrue(launcherCleanupCalled == maxFailures); - assertTrue(launcherLaunchCalled == maxFailures); - assertTrue(schedulerAddApplication == maxFailures); - assertTrue(schedulerRemoveApplication == maxFailures); - assertTrue(masterInfo.getFailedCount() == maxFailures); + Assert.assertEquals(maxFailures, launcherCleanupCalled); + Assert.assertEquals(maxFailures, launcherLaunchCalled); + Assert.assertEquals(maxFailures, schedulerAddApplication); + Assert.assertEquals(maxFailures, schedulerRemoveApplication); + Assert.assertEquals(maxFailures, masterInfo.getFailedCount()); waitForFailed(masterInfo, ApplicationState.FAILED); stop = true; } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java Fri Jun 24 21:30:53 2011 @@ -21,37 +21,31 @@ package org.apache.hadoop.yarn.server.re import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; -import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestASMStateMachine extends TestCase { +public class TestASMStateMachine { private static final Log LOG = LogFactory.getLog(TestASMStateMachine.class); private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); RMContext context = new ResourceManager.RMContextImpl(new MemStore()); @@ -63,10 +57,10 @@ public class TestASMStateMachine extends private boolean removedApplication = false; private boolean launchCleanupCalled = false; private AtomicInteger waitForState = new AtomicInteger(); - + private Configuration conf = new Configuration(); @Before public void setUp() { - context.getDispatcher().init(new Configuration()); + context.getDispatcher().init(conf); context.getDispatcher().start(); handler = context.getDispatcher().getEventHandler(); new DummyAMLaunchEventHandler(); @@ -95,8 +89,8 @@ public class TestASMStateMachine extends launchCalled = true; appcontext = event.getAppContext(); context.getDispatcher().getEventHandler().handle( - new ASMEvent(ApplicationEventType.LAUNCHED, - appcontext)); + new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED, + appcontext.getApplicationID())); break; case CLEANUP: launchCleanupCalled = true; @@ -123,71 +117,14 @@ public class TestASMStateMachine extends snAllocateReceived = true; appContext = event.getAppContext(); context.getDispatcher().getEventHandler().handle( - new ASMEvent(ApplicationEventType.ALLOCATED, - appContext)); + new ApplicationMasterAllocatedEvent(appContext.getApplicationID(), + appContext.getMasterContainer())); break; } } } - private static class StatusContext implements AppContext { - @Override - public ApplicationSubmissionContext getSubmissionContext() { - return null; - } - @Override - public Resource getResource() { - return null; - } - @Override - public ApplicationId getApplicationID() { - return null; - } - @Override - public ApplicationStatus getStatus() { - ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class); - status.setLastSeen(-99); - return status; - } - @Override - public ApplicationMaster getMaster() { - return null; - } - @Override - public Container getMasterContainer() { - return null; - } - @Override - public String getUser() { - return null; - } - @Override - public String getName() { - return null; - } - @Override - public String getQueue() { - return null; - } - @Override - public int getFailedCount() { - return 0; - } - @Override - public ApplicationStore getStore() { - return StoreFactory.createVoidAppStore(); - } - @Override - public long getStartTime() { - return 0; - } - @Override - public long getFinishTime() { - return 0; - } - } - private class ApplicationTracker implements EventHandler> { public ApplicationTracker() { context.getDispatcher().register(ApplicationTrackerEventType.class, this); @@ -206,13 +143,14 @@ public class TestASMStateMachine extends } } - private class MockAppplicationMasterInfo implements EventHandler> { + private class MockAppplicationMasterInfo implements + EventHandler { MockAppplicationMasterInfo() { context.getDispatcher().register(ApplicationEventType.class, this); } @Override - public void handle(ASMEvent event) { + public void handle(ApplicationMasterInfoEvent event) { LOG.info("The event type is " + event.getType()); } } @@ -224,7 +162,7 @@ public class TestASMStateMachine extends Thread.sleep(500); count++; } - assertTrue(masterInfo.getState() == finalState); + Assert.assertEquals(finalState, masterInfo.getState()); } /* Test the state machine. @@ -237,34 +175,34 @@ public class TestASMStateMachine extends submissioncontext.getApplicationId().setId(1); submissioncontext.getApplicationId().setClusterTimestamp(System.currentTimeMillis()); - ApplicationMasterInfo masterInfo - = new ApplicationMasterInfo(context, "dummyuser", submissioncontext, "dummyToken" - , StoreFactory.createVoidAppStore()); + ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(context, + conf, "dummyuser", submissioncontext, "dummyToken", StoreFactory + .createVoidAppStore(), new AMLivelinessMonitor(context + .getDispatcher().getEventHandler())); context.getDispatcher().register(ApplicationEventType.class, masterInfo); - handler.handle(new ASMEvent(ApplicationEventType. - ALLOCATE, masterInfo)); - - waitForState(ApplicationState.ALLOCATED, masterInfo); - handler.handle(new ASMEvent( - ApplicationEventType.LAUNCH, masterInfo)); + handler.handle(new ApplicationMasterInfoEvent( + ApplicationEventType.ALLOCATE, submissioncontext.getApplicationId())); waitForState(ApplicationState.LAUNCHED, masterInfo); Assert.assertTrue(snAllocateReceived); Assert.assertTrue(launchCalled); Assert.assertTrue(addedApplication); - handler.handle(new ASMEvent( - ApplicationEventType.REGISTERED, masterInfo)); + handler + .handle(new ApplicationMasterRegistrationEvent(masterInfo.getMaster())); waitForState(ApplicationState.RUNNING, masterInfo); Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState()); - handler.handle(new ASMEvent( - ApplicationEventType.STATUSUPDATE, new StatusContext())); + + ApplicationStatus status = recordFactory + .newRecordInstance(ApplicationStatus.class); + status.setApplicationId(masterInfo.getApplicationID()); + handler.handle(new ApplicationMasterStatusUpdateEvent(status)); /* check if the state is still RUNNING */ Assert.assertEquals(ApplicationState.RUNNING, masterInfo.getState()); - handler.handle(new ApplicationFinishEvent(masterInfo, + handler.handle(new ApplicationFinishEvent(masterInfo.getApplicationID(), ApplicationState.COMPLETED)); waitForState(ApplicationState.COMPLETED, masterInfo); Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState()); @@ -274,8 +212,8 @@ public class TestASMStateMachine extends Assert.assertTrue(removedApplication); /* check if expiry doesnt make it failed */ - handler.handle( - new ASMEvent(ApplicationEventType.EXPIRE, masterInfo)); + handler.handle(new ApplicationMasterInfoEvent(ApplicationEventType.EXPIRE, + masterInfo.getApplicationID())); Assert.assertEquals(ApplicationState.COMPLETED, masterInfo.getState()); } } Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1139443&r1=1139442&r2=1139443&view=diff ============================================================================== --- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java (original) +++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java Fri Jun 24 21:30:53 2011 @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.TestCase; +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,7 +73,7 @@ import org.junit.Test; * */ @Ignore -public class TestApplicationCleanup extends TestCase { +public class TestApplicationCleanup { private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class); private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private AtomicInteger waitForState = new AtomicInteger(0); @@ -159,9 +159,9 @@ public class TestApplicationCleanup exte LOG.info("Launcher Launch called"); launcherLaunchCalled = true; appContext = appEvent.getAppContext(); - context.getDispatcher().getEventHandler(). - handle(new ASMEvent( - ApplicationEventType.LAUNCHED, appContext)); + context.getDispatcher().getEventHandler().handle( + new ApplicationMasterInfoEvent(ApplicationEventType.LAUNCHED, + appContext.getApplicationID())); break; default: break; @@ -186,9 +186,9 @@ public class TestApplicationCleanup exte case SCHEDULE: schedulerScheduleCalled = true; acontext = appEvent.getAppContext(); - context.getDispatcher().getEventHandler(). - handle(new ASMEvent( - ApplicationEventType.ALLOCATED, acontext)); + context.getDispatcher().getEventHandler().handle( + new ApplicationMasterAllocatedEvent(acontext.getApplicationID(), + acontext.getMasterContainer())); default: break; } @@ -221,7 +221,7 @@ public class TestApplicationCleanup exte Thread.sleep(500); count++; } - assertTrue(masterInfo.getState() == finalState); + Assert.assertEquals(finalState, masterInfo.getState()); } @@ -277,22 +277,22 @@ public class TestApplicationCleanup exte LOG.info("Available resource on first node" + firstNode.getAvailableResource()); LOG.info("Available resource on second node" + secondNode.getAvailableResource()); /* only allocate the containers to the first node */ - assertTrue(firstNode.getAvailableResource().getMemory() == - (firstNodeMemory - (2*memoryNeeded))); + Assert.assertEquals((firstNodeMemory - (2 * memoryNeeded)), firstNode + .getAvailableResource().getMemory()); ApplicationMasterInfo masterInfo = asm.getApplicationMasterInfo(appID); asm.finishApplication(appID, UserGroupInformation.getCurrentUser()); while (asm.launcherCleanupCalled != true) { Thread.sleep(500); } - assertTrue(asm.launcherCleanupCalled == true); - assertTrue(asm.launcherLaunchCalled == true); - assertTrue(asm.schedulerCleanupCalled == true); - assertTrue(asm.schedulerScheduleCalled == true); + Assert.assertTrue(asm.launcherCleanupCalled); + Assert.assertTrue(asm.launcherLaunchCalled); + Assert.assertTrue(asm.schedulerCleanupCalled); + Assert.assertTrue(asm.schedulerScheduleCalled); /* check for update of completed application */ clusterTracker.updateListener(firstNode, containers); NodeResponse response = firstNode.statusUpdate(containers); - assertTrue(response.getFinishedApplications().contains(appID)); + Assert.assertTrue(response.getFinishedApplications().contains(appID)); LOG.info("The containers to clean up " + response.getContainersToCleanUp().size()); - assertTrue(response.getContainersToCleanUp().size() == 2); + Assert.assertEquals(2, response.getContainersToCleanUp().size()); } }