hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1143250 [2/4] - in /hadoop/common/branches/MR-279/mapreduce: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapre...
Date Wed, 06 Jul 2011 04:51:48 GMT
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Wed Jul  6 04:51:46 2011
@@ -56,9 +56,11 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationMasterHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMFinishEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMStatusUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -67,7 +69,6 @@ import org.apache.hadoop.yarn.service.Ab
 public class ApplicationMasterService extends AbstractService implements 
 AMRMProtocol, EventHandler<ASMEvent<ApplicationTrackerEventType>> {
   private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
-  private ApplicationMasterHandler applicationsManager;
   private YarnScheduler rScheduler;
   private ApplicationTokenSecretManager appTokenManager;
   private InetSocketAddress masterServiceAddress;
@@ -76,17 +77,17 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
   private final Map<ApplicationId, AMResponse> responseMap =
       new HashMap<ApplicationId, AMResponse>();
   private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
-  private final RMContext asmContext;
+  private final RMContext context;
   
-  public ApplicationMasterService(ApplicationTokenSecretManager appTokenManager,
-      ApplicationMasterHandler applicationsManager, YarnScheduler scheduler, RMContext asmContext) {
+  public ApplicationMasterService(
+      ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler,
+      RMContext asmContext) {
     super(ApplicationMasterService.class.getName());
     this.appTokenManager = appTokenManager;
-    this.applicationsManager = applicationsManager;
     this.rScheduler = scheduler;
     this.reboot.setReboot(true);
 //    this.reboot.containers = new ArrayList<Container>();
-    this.asmContext = asmContext;
+    this.context = asmContext;
   }
 
   @Override
@@ -95,7 +96,7 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
       conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
           YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
     masterServiceAddress =  NetUtils.createSocketAddr(bindAddress);
-    this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, this);
+    this.context.getDispatcher().register(ApplicationTrackerEventType.class, this);
     super.init(conf);
   }
 
@@ -118,13 +119,11 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
     // TODO: What if duplicate register due to lost RPCs
+
     ApplicationMaster applicationMaster = request.getApplicationMaster();
-    try {
-      applicationsManager.registerApplicationMaster(applicationMaster);
-    } catch(IOException ie) {
-      LOG.info("Exception registering application ", ie);
-      throw RPCUtil.getRemoteException(ie);
-    }
+    LOG.info("AM registration " + applicationMaster.getApplicationId());
+    context.getDispatcher().getEventHandler().handle(
+        new AMRegistrationEvent(applicationMaster));
     
     // Pick up min/max resource from scheduler...
     RegisterApplicationMasterResponse response = 
@@ -137,16 +136,16 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
   }
 
   @Override
-  public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnRemoteException {
     // TODO: What if duplicate finish due to lost RPCs
     ApplicationMaster applicationMaster = request.getApplicationMaster();
-    try {
-      applicationsManager.finishApplicationMaster(applicationMaster);
-    } catch(IOException ie) {
-      LOG.info("Exception finishing application", ie);
-      throw RPCUtil.getRemoteException(ie);
-    }
-    FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
+    context.getDispatcher().getEventHandler().handle(
+        new AMFinishEvent(applicationMaster.getApplicationId(),
+            applicationMaster.getState(), applicationMaster.getTrackingUrl(),
+            applicationMaster.getDiagnostics()));
+    FinishApplicationMasterResponse response = recordFactory
+        .newRecordInstance(FinishApplicationMasterResponse.class);
     return response;
   }
 
@@ -180,7 +179,11 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
           allocateResponse.setAMResponse(reboot);
           return allocateResponse;
         }
-        applicationsManager.applicationHeartbeat(status);
+
+        // Send the heart-beat to the application.
+        context.getDispatcher().getEventHandler().handle(
+            new AMStatusUpdateEvent(status));
+
         Allocation allocation = 
           rScheduler.allocate(status.getApplicationId(), ask, release); 
         AMResponse  response = recordFactory.newRecordInstance(AMResponse.class);
@@ -208,7 +211,7 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
   @Override
   public void handle(ASMEvent<ApplicationTrackerEventType> appEvent) {
     ApplicationTrackerEventType event = appEvent.getType();
-    ApplicationId id = appEvent.getAppContext().getApplicationID();
+    ApplicationId id = appEvent.getApplication().getApplicationID();
     synchronized(responseMap) {
       switch (event) {
       case ADD:

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationsManager.java?rev=1143250&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationsManager.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationsManager.java Wed Jul  6 04:51:46 2011
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * This interface defines the interface for ApplicationsManager.
+ */
+@Private
+@Evolving
+public interface ApplicationsManager extends Recoverable, Service{
+
+  AMLivelinessMonitor getAmLivelinessMonitor();
+
+  ClientToAMSecretManager getClientToAMSecretManager();
+
+}
\ No newline at end of file

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Jul  6 04:51:46 2011
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
@@ -31,13 +34,15 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
@@ -51,7 +56,10 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeManagerInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -60,11 +68,22 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application.ApplicationACL;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.application.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 
@@ -72,24 +91,36 @@ import org.apache.hadoop.yarn.service.Ab
  * The client interface to the Resource Manager. This module handles all the rpc
  * interfaces to the resource manager from the client.
  */
-public class ClientRMService extends AbstractService implements ClientRMProtocol {
+public class ClientRMService extends AbstractService implements
+    ClientRMProtocol {
   private static final Log LOG = LogFactory.getLog(ClientRMService.class);
-  
+
+  final private AtomicInteger applicationCounter = new AtomicInteger(0);
+
   final private ClusterTracker clusterInfo;
-  final private ApplicationsManager applicationsManager;
-  final private ResourceScheduler scheduler;
-  
+  final private YarnScheduler scheduler;
+  final private RMContext rmContext;
+  private final ClientToAMSecretManager clientToAMSecretManager;
+  private final AMLivelinessMonitor amLivelinessMonitor;
+
   private String clientServiceBindAddress;
   private Server server;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   InetSocketAddress clientBindAddress;
+
+  private  ApplicationACLsManager aclsManager;
+  private Map<ApplicationACL, AccessControlList> applicationACLs;
   
-  public ClientRMService(ApplicationsManager applicationsManager, 
-        ClusterTracker clusterInfo, ResourceScheduler scheduler) {
+  public ClientRMService(RMContext rmContext,
+      AMLivelinessMonitor amLivelinessMonitor,
+      ClientToAMSecretManager clientToAMSecretManager,
+      ClusterTracker clusterInfo, YarnScheduler scheduler) {
     super(ClientRMService.class.getName());
     this.clusterInfo = clusterInfo;
-    this.applicationsManager = applicationsManager;
     this.scheduler = scheduler;
+    this.rmContext = rmContext;
+    this.amLivelinessMonitor = amLivelinessMonitor;
+    this.clientToAMSecretManager = clientToAMSecretManager;
   }
   
   @Override
@@ -99,6 +130,10 @@ public class ClientRMService extends Abs
           YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
     clientBindAddress =
       NetUtils.createSocketAddr(clientServiceBindAddress);
+
+    this.aclsManager = new ApplicationACLsManager(conf);
+    this.applicationACLs = aclsManager.constructApplicationACLs(conf);
+
     super.init(conf);
   }
   
@@ -121,43 +156,172 @@ public class ClientRMService extends Abs
     super.start();
   }
 
+  private ApplicationReport createApplicationReport(Application application,
+      String user, String queue, String name, Container masterContainer) {
+    ApplicationMaster am = application.getMaster();
+    ApplicationReport applicationReport = 
+      recordFactory.newRecordInstance(ApplicationReport.class);
+    applicationReport.setApplicationId(am.getApplicationId());
+    applicationReport.setMasterContainer(masterContainer);
+    applicationReport.setHost(am.getHost());
+    applicationReport.setRpcPort(am.getRpcPort());
+    applicationReport.setClientToken(am.getClientToken());
+    applicationReport.setTrackingUrl(am.getTrackingUrl());
+    applicationReport.setDiagnostics(am.getDiagnostics());
+    applicationReport.setName(name);
+    applicationReport.setQueue(queue);
+    applicationReport.setState(am.getState());
+    applicationReport.setStatus(am.getStatus());
+    applicationReport.setUser(user);
+    return applicationReport;
+  }
+
+  /**
+   * check if the calling user has the access to application information.
+   * @param applicationId
+   * @param callerUGI
+   * @param owner
+   * @param appACL
+   * @return
+   */
+  private boolean checkAccess(UserGroupInformation callerUGI, String owner, ApplicationACL appACL) {
+      if (!UserGroupInformation.isSecurityEnabled()) {
+        return true;
+      }
+      AccessControlList applicationACL = applicationACLs.get(appACL);
+      return aclsManager.checkAccess(callerUGI, appACL, owner, applicationACL);
+  }
+
+  public ApplicationId getNewApplicationId() {
+    ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils
+        .newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
+            applicationCounter.incrementAndGet());
+    LOG.info("Allocated new applicationId: " + applicationId.getId());
+    return applicationId;
+  }
+
   @Override
-  public GetNewApplicationIdResponse getNewApplicationId(GetNewApplicationIdRequest request) throws YarnRemoteException {
-    GetNewApplicationIdResponse response = recordFactory.newRecordInstance(GetNewApplicationIdResponse.class);
-    response.setApplicationId(applicationsManager.getNewApplicationID());
+  public GetNewApplicationIdResponse getNewApplicationId(
+      GetNewApplicationIdRequest request) throws YarnRemoteException {
+    GetNewApplicationIdResponse response = recordFactory
+        .newRecordInstance(GetNewApplicationIdResponse.class);
+    response.setApplicationId(getNewApplicationId());
     return response;
   }
   
   @Override
-  public GetApplicationMasterResponse getApplicationMaster(GetApplicationMasterRequest request) throws YarnRemoteException {
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnRemoteException {
     ApplicationId applicationId = request.getApplicationId();
-    GetApplicationMasterResponse response = recordFactory.newRecordInstance(GetApplicationMasterResponse.class);
-    response.setApplicationMaster(applicationsManager.getApplicationMaster(applicationId));
+    Application application = rmContext.getApplications().get(applicationId);
+    ApplicationReport report = (application == null) ? null
+        : createApplicationReport(application, application.getUser(),
+            application.getQueue(), application.getName(), application
+                .getMasterContainer());
+
+    GetApplicationReportResponse response = recordFactory
+        .newRecordInstance(GetApplicationReportResponse.class);
+    response.setApplicationReport(report);
     return response;
   }
 
-  public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnRemoteException {
-    ApplicationSubmissionContext context = request.getApplicationSubmissionContext();
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnRemoteException {
+    ApplicationSubmissionContext submissionContext = request
+        .getApplicationSubmissionContext();
     try {
-      applicationsManager.submitApplication(context);
+
+      ApplicationId applicationId = submissionContext.getApplicationId();
+      String clientTokenStr = null;
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      if (UserGroupInformation.isSecurityEnabled()) {
+        Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>(
+            new ApplicationTokenIdentifier(applicationId),
+            this.clientToAMSecretManager);
+        clientTokenStr = clientToken.encodeToUrlString();
+        LOG.debug("Sending client token as " + clientTokenStr);
+      }
+
+      submissionContext.setQueue(submissionContext.getQueue() == null
+          ? "default" : submissionContext.getQueue());
+      submissionContext.setApplicationName(submissionContext
+          .getApplicationName() == null ? "N/A" : submissionContext
+          .getApplicationName());
+
+      ApplicationStore appStore = rmContext.getApplicationsStore()
+          .createApplicationStore(submissionContext.getApplicationId(),
+              submissionContext);
+      Application application = new ApplicationImpl(rmContext, getConfig(),
+          user, submissionContext, clientTokenStr, appStore,
+          this.amLivelinessMonitor);
+      if (rmContext.getApplications().putIfAbsent(
+          application.getApplicationID(), application) != null) {
+        throw new IOException("Application with id "
+            + application.getApplicationID()
+            + " is already present! Cannot add a duplicate!");
+      }
+
+      /**
+       * this can throw so we need to call it synchronously to let the client
+       * know as soon as it submits. For backwards compatibility we cannot make
+       * it asynchronous
+       */
+      try {
+        scheduler.addApplication(applicationId, application.getMaster(),
+            user, application.getQueue(), submissionContext.getPriority(),
+            application.getStore());
+      } catch (IOException io) {
+        LOG.info("Failed to submit application " + applicationId, io);
+        rmContext.getDispatcher().getSyncHandler().handle(
+            new ApplicationEvent(ApplicationEventType.FAILED, applicationId));
+        throw io;
+      }
+
+      rmContext.getDispatcher().getSyncHandler().handle(
+          new ApplicationEvent(ApplicationEventType.ALLOCATE, applicationId));
+
+      // TODO this should happen via dispatcher. should move it out to scheudler
+      // negotiator.
+      LOG.info("Application with id " + applicationId.getId()
+          + " submitted by user " + user + " with " + submissionContext);
     } catch (IOException ie) {
       LOG.info("Exception in submitting application", ie);
       throw RPCUtil.getRemoteException(ie);
     }
-    SubmitApplicationResponse response = recordFactory.newRecordInstance(SubmitApplicationResponse.class);
+
+    SubmitApplicationResponse response = recordFactory
+        .newRecordInstance(SubmitApplicationResponse.class);
     return response;
   }
 
   @Override
-  public FinishApplicationResponse finishApplication(FinishApplicationRequest request) throws YarnRemoteException {
+  public FinishApplicationResponse finishApplication(
+      FinishApplicationRequest request) throws YarnRemoteException {
+
     ApplicationId applicationId = request.getApplicationId();
+
+    UserGroupInformation callerUGI;
     try {
-      UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
-      applicationsManager.finishApplication(applicationId, callerUGI);
-    } catch(IOException ie) {
-      LOG.info("Error finishing application ", ie);
+      callerUGI = UserGroupInformation.getCurrentUser();
+    } catch (IOException ie) {
+      LOG.info("Error getting UGI ", ie);
+      throw RPCUtil.getRemoteException(ie);
+    }
+
+    Application application = rmContext.getApplications().get(applicationId);
+    // TODO: What if null
+    if (!checkAccess(callerUGI, application.getUser(),
+        ApplicationACL.MODIFY_APP)) {
+      throw RPCUtil.getRemoteException(new AccessControlException("User "
+          + callerUGI.getShortUserName() + " cannot perform operation "
+          + ApplicationACL.MODIFY_APP.name() + " on " + applicationId));
     }
-    FinishApplicationResponse response = recordFactory.newRecordInstance(FinishApplicationResponse.class);
+
+    rmContext.getDispatcher().getEventHandler().handle(
+        new ApplicationEvent(ApplicationEventType.KILL, applicationId));
+
+    FinishApplicationResponse response = recordFactory
+        .newRecordInstance(FinishApplicationResponse.class);
     return response;
   }
 
@@ -171,9 +335,17 @@ public class ClientRMService extends Abs
   @Override
   public GetAllApplicationsResponse getAllApplications(
       GetAllApplicationsRequest request) throws YarnRemoteException {
+
+    List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
+    for (Application application : rmContext.getApplications().values()) {
+      reports.add(createApplicationReport(application, application.getUser(),
+          application.getQueue(), application.getName(), application
+              .getMasterContainer()));
+    }
+
     GetAllApplicationsResponse response = 
       recordFactory.newRecordInstance(GetAllApplicationsResponse.class);
-    response.setApplicationList(applicationsManager.getApplications());
+    response.setApplicationList(reports);
     return response;
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Jul  6 04:51:46 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.re
 
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.avro.AvroRuntimeException;
@@ -30,12 +32,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.
-  ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
@@ -61,7 +67,7 @@ public class ResourceManager extends Com
   public static final long clusterTimeStamp = System.currentTimeMillis();
   private YarnConfiguration conf;
   
-  private ApplicationsManagerImpl applicationsManager;
+  private ApplicationsManager applicationsManager;
   
   private ContainerTokenSecretManager containerTokenSecretManager =
       new ContainerTokenSecretManager();
@@ -69,7 +75,6 @@ public class ResourceManager extends Com
   private ApplicationTokenSecretManager appTokenSecretManager =
       new ApplicationTokenSecretManager();
 
-  
   private ResourceScheduler scheduler;
   private ResourceTrackerService resourceTracker;
   private ClientRMService clientRM;
@@ -93,12 +98,15 @@ public class ResourceManager extends Com
     public RMDispatcherImpl getDispatcher();
     public NodeStore getNodeStore();
     public ApplicationsStore getApplicationsStore();
+    public ConcurrentMap<ApplicationId, Application> getApplications();
   }
   
   public static class RMContextImpl implements RMContext {
     private final RMDispatcherImpl asmEventDispatcher;
     private final Store store;
-    
+    private final ConcurrentMap<ApplicationId, Application> applications = 
+      new ConcurrentHashMap<ApplicationId, Application>();
+
     public RMContextImpl(Store store) {
       this.asmEventDispatcher = new RMDispatcherImpl();
       this.store = store;
@@ -118,6 +126,11 @@ public class ResourceManager extends Com
     public ApplicationsStore getApplicationsStore() {
       return store;
     }
+
+    @Override
+    public ConcurrentMap<ApplicationId, Application> getApplications() {
+      return this.applications;
+    }
   }
   
   
@@ -134,13 +147,17 @@ public class ResourceManager extends Com
           conf.getClass(RMConfig.RESOURCE_SCHEDULER, 
               FifoScheduler.class, ResourceScheduler.class), 
           this.conf);
-  
+
+    // Register event handler for ApplicationEvents.
+    this.rmContext.getDispatcher().register(ApplicationEventType.class,
+        new ApplicationEventDispatcher(this.rmContext));
+
     this.rmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
         .createSecretKey("Dummy".getBytes()));
 
-    applicationsManager = createApplicationsManagerImpl();
+    applicationsManager = createApplicationsManager();
     addService(applicationsManager);
     
     resourceTracker = createResourceTrackerService();
@@ -166,7 +183,30 @@ public class ResourceManager extends Com
 
     super.init(conf);
   }
-  
+
+  public static final class ApplicationEventDispatcher implements
+      EventHandler<ApplicationEvent> {
+
+    private final RMContext rmContext;
+
+    public ApplicationEventDispatcher(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      ApplicationId appID = event.getApplicationId();
+      ApplicationImpl application = (ApplicationImpl) this.rmContext
+          .getApplications().get(appID);
+      try {
+        application.handle(event);
+      } catch (Throwable t) {
+        LOG.error("Error in handling event type " + event.getType()
+            + " for application " + event.getApplicationId(), t);
+      }
+    }
+  }
+
   @Override
   public void start() {
     try {
@@ -222,19 +262,21 @@ public class ResourceManager extends Com
             this.rmContext));
   }
   
-  protected ApplicationsManagerImpl createApplicationsManagerImpl() {
+  protected ApplicationsManager createApplicationsManager() {
     return new ApplicationsManagerImpl(
         this.appTokenSecretManager, this.scheduler, this.rmContext);
   }
 
   protected ClientRMService createClientRMService() {
-    return new ClientRMService(applicationsManager, 
-        resourceTracker.getResourceTracker(), scheduler);
+    return new ClientRMService(this.rmContext, this.applicationsManager
+        .getAmLivelinessMonitor(), this.applicationsManager
+        .getClientToAMSecretManager(), resourceTracker.getResourceTracker(),
+        scheduler);
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
-    return new ApplicationMasterService(
-      this.appTokenSecretManager, applicationsManager, scheduler, this.rmContext);
+    return new ApplicationMasterService(this.appTokenSecretManager,
+        scheduler, this.rmContext);
   }
   
 
@@ -243,6 +285,11 @@ public class ResourceManager extends Com
     return new AdminService(conf, scheduler, nodesTracker);
   }
 
+  @Private
+  public ClientRMService getClientRMService() {
+    return this.clientRM;
+  }
+
   /**
    * return applications manager.
    * @return

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java Wed Jul  6 04:51:46 2011
@@ -64,43 +64,47 @@ import org.apache.hadoop.yarn.security.C
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
-
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMFinishEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+
+/**
+ * The launch of the AM itself.
+ */
 public class AMLauncher implements Runnable {
 
   private static final Log LOG = LogFactory.getLog(AMLauncher.class);
 
   private ContainerManager containerMgrProxy;
 
-  private final AppContext master;
+  private final Application application;
   private final Configuration conf;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private ApplicationTokenSecretManager applicationTokenSecretManager;
-  private ClientToAMSecretManager clientToAMSecretManager;
-  private AMLauncherEventType event;
+  private final ApplicationTokenSecretManager applicationTokenSecretManager;
+  private final ClientToAMSecretManager clientToAMSecretManager;
+  private final AMLauncherEventType eventType;
   
   @SuppressWarnings("rawtypes")
-  private EventHandler handler;
+  private final EventHandler handler;
   
   @SuppressWarnings("unchecked")
-  public AMLauncher(RMContext asmContext, AppContext master,
-      AMLauncherEventType event,ApplicationTokenSecretManager applicationTokenSecretManager,
+  public AMLauncher(RMContext asmContext, Application application,
+      AMLauncherEventType eventType,ApplicationTokenSecretManager applicationTokenSecretManager,
       ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
-    this.master = master;
+    this.application = application;
     this.conf = new Configuration(conf); // Just not to touch the sec-info class
     this.applicationTokenSecretManager = applicationTokenSecretManager;
     this.clientToAMSecretManager = clientToAMSecretManager;
     this.conf.setClass(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
         ContainerManagerSecurityInfo.class, SecurityInfo.class);
-    this.event = event;
+    this.eventType = eventType;
     this.handler = asmContext.getDispatcher().getEventHandler();
   }
   
   private void connect() throws IOException {
-    ContainerId masterContainerID = master.getMasterContainer().getId();
+    ContainerId masterContainerID = application.getMasterContainer().getId();
     
     containerMgrProxy =
         getContainerMgrProxy(masterContainerID.getAppId());
@@ -108,23 +112,23 @@ public class AMLauncher implements Runna
   
   private void launch() throws IOException {
     connect();
-    ContainerId masterContainerID = master.getMasterContainer().getId();
+    ContainerId masterContainerID = application.getMasterContainer().getId();
     ApplicationSubmissionContext applicationContext =
-      master.getSubmissionContext();
-    LOG.info("Setting up container " + master.getMasterContainer() 
-        + " for AM " + master.getMaster());  
+      application.getSubmissionContext();
+    LOG.info("Setting up container " + application.getMasterContainer() 
+        + " for AM " + application.getMaster());  
     ContainerLaunchContext launchContext =
         createAMContainerLaunchContext(applicationContext, masterContainerID);
     StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
     request.setContainerLaunchContext(launchContext);
     containerMgrProxy.startContainer(request);
-    LOG.info("Done launching container " + master.getMasterContainer() 
-        + " for AM " + master.getMaster());
+    LOG.info("Done launching container " + application.getMasterContainer() 
+        + " for AM " + application.getMaster());
   }
   
   private void cleanup() throws IOException {
     connect();
-    ContainerId containerId = master.getMasterContainer().getId();
+    ContainerId containerId = application.getMasterContainer().getId();
     StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
     stopRequest.setContainerId(containerId);
     containerMgrProxy.stopContainer(stopRequest);
@@ -133,7 +137,7 @@ public class AMLauncher implements Runna
   private ContainerManager getContainerMgrProxy(
       final ApplicationId applicationID) throws IOException {
 
-    Container container = master.getMasterContainer();
+    Container container = application.getMasterContainer();
 
     final String containerManagerBindAddress = container.getContainerManagerAddress();
 
@@ -168,9 +172,10 @@ public class AMLauncher implements Runna
     ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
     container.addAllCommands(applicationMasterContext.getCommandList());
     StringBuilder mergedCommand = new StringBuilder();
-    String failCount = Integer.toString(master.getFailedCount());
+    String failCount = Integer.toString(application.getFailedCount());
     List<String> commandList = new ArrayList<String>();
     for (String str : container.getCommandList()) {
+      // This is out-right wrong. AM FAIL count should be passed via env.
       String result =
           str.replaceFirst(ApplicationConstants.AM_FAIL_COUNT_STRING,
               failCount);
@@ -213,8 +218,8 @@ public class AMLauncher implements Runna
         credentials.readTokenStorageStream(dibb);
       }
 
-      ApplicationTokenIdentifier id =
-          new ApplicationTokenIdentifier(master.getMasterContainer().getId().getAppId());
+      ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
+          application.getApplicationID());
       Token<ApplicationTokenIdentifier> token =
           new Token<ApplicationTokenIdentifier>(id,
               this.applicationTokenSecretManager);
@@ -238,9 +243,8 @@ public class AMLauncher implements Runna
       credentials.writeTokenStorageToStream(dob);
       asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
 
-      ApplicationTokenIdentifier identifier =
-          new ApplicationTokenIdentifier(
-              this.master.getMaster().getApplicationId());
+      ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
+          this.application.getApplicationID());
       SecretKey clientSecretKey =
           this.clientToAMSecretManager.getMasterKey(identifier);
       String encoded =
@@ -253,35 +257,30 @@ public class AMLauncher implements Runna
   
   @SuppressWarnings("unchecked")
   public void run() {
-    switch (event) {
+    switch (eventType) {
     case LAUNCH:
-      ApplicationEventType eventType = ApplicationEventType.LAUNCHED;
+      ApplicationEventType targetEventType = ApplicationEventType.LAUNCHED;
       try {
-        LOG.info("Launching master" + master.getMaster());
+        LOG.info("Launching master" + application.getMaster());
         launch();
       } catch(Exception ie) {
         LOG.info("Error launching ", ie);
-        eventType = ApplicationEventType.LAUNCH_FAILED;
+        targetEventType = ApplicationEventType.LAUNCH_FAILED;
       }
-      handler.handle(new ApplicationMasterInfoEvent(eventType, master
+      handler.handle(new ApplicationEvent(targetEventType, application
           .getApplicationID()));
       break;
     case CLEANUP:
       try {
-        LOG.info("Cleaning master " + master.getMaster());
+        LOG.info("Cleaning master " + application.getMaster());
         cleanup();
       } catch(IOException ie) {
         LOG.info("Error cleaning master ", ie);
       }
-      handler.handle(new ApplicationFinishEvent(master.getApplicationID(),
-          ApplicationState.COMPLETED)); // Doesn't matter what state you send :) :(
       break;
     default:
+      LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
       break;
     }
   }
-
-  public AppContext getApplicationContext() {
-   return master;
-  }
 }

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLivelinessMonitor.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLivelinessMonitor.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLivelinessMonitor.java Wed Jul  6 04:51:46 2011
@@ -34,14 +34,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
  * This class runs continuosly to track the application masters
  * that might be dead.
  */
-class AMLivelinessMonitor extends AbstractService {
+public class AMLivelinessMonitor extends AbstractService {
   private volatile boolean stop = false;
   long monitoringInterval =
       RMConfig.DEFAULT_AMLIVELINESS_MONITORING_INTERVAL;
@@ -156,7 +157,7 @@ class AMLivelinessMonitor extends Abstra
   private void expireAMs(List<ApplicationId> toExpire) {
     for (ApplicationId applicationId: toExpire) {
       LOG.info("Expiring the Application " + applicationId);
-      handler.handle(new ApplicationMasterInfoEvent(
+      handler.handle(new ApplicationEvent(
           ApplicationEventType.EXPIRE, applicationId));
     }
   }

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/Application.java?rev=1143250&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/Application.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/Application.java Wed Jul  6 04:51:46 2011
@@ -0,0 +1,116 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+
+/** 
+ * The context of an application. 
+ *
+ */
+public interface Application {
+  
+  /**
+   * the application submission context for this application.
+   * @return the {@link XApplicationSubmissionContext} for the submitted
+   * application.
+   */
+  public ApplicationSubmissionContext getSubmissionContext();
+  
+  /**
+   *  get the resource required for the application master.
+   * @return the resource requirements of the application master
+   */
+  public Resource getResource();
+  
+  /**
+   * get the application ID for this application
+   * @return the application id for this application
+   */
+  public ApplicationId getApplicationID(); 
+  
+  /**
+   * get the status of the application
+   * @return the {@link XApplicationStatus} of this application
+   */
+  public ApplicationStatus getStatus();
+  
+  /**
+   * the application master for this application.
+   * @return the {@link XApplicationMaster} for this application
+   */
+  public ApplicationMaster getMaster();
+  
+  /**
+   * the container on which the application master is running.
+   * @return the container for running the application master.
+   */
+  public Container getMasterContainer();
+  
+  /**
+   * the user for this application
+   * @return the user for this application
+   */
+  public String getUser();
+  
+  /**
+   * the name for this application
+   * @return the application name.
+   */
+  public String getName();
+  
+  /**
+   * The queue of this application.
+   * @return the queue for this application
+   */
+  public String getQueue();
+  
+  /**
+   * the count of number of times the AM has expired/failed.
+   * @return the count of number of times the AM has expired/failed.
+   */
+  public int getFailedCount();
+  
+  /**
+   * The store for this application
+   * @return the application store for this application
+   */
+  public ApplicationStore getStore();
+  
+  /**
+   * the start time of the application
+   * @return the start time of the application
+   */
+  public long getStartTime();
+  
+  /**
+   * The finish time of the application
+   * @return the finish time of the application
+   */
+  public long getFinishTime();
+
+  ApplicationState getState();
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationImpl.java?rev=1143250&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationImpl.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationImpl.java Wed Jul  6 04:51:46 2011
@@ -0,0 +1,607 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMFinishEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMAllocatedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMRegistrationEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMStatusUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+/**
+ * This class manages the state of a application master. Also, it
+ * provide a read only interface for all the services to get information
+ * about this application.
+ *
+ */
+@Private
+@Unstable
+public class ApplicationImpl implements Application,
+    EventHandler<ApplicationEvent> {
+  private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
+  private final ApplicationSubmissionContext submissionContext;
+  private ApplicationMaster master;
+  private final EventHandler handler;
+  /** only to be used during recovery **/
+  private final EventHandler syncHandler;
+  private Container masterContainer;
+  final private String user;
+  private long startTime = 0;
+  private long finishTime = 0;
+  private String diagnostic;
+  private static String DIAGNOSTIC_KILL_APPLICATION = "Application was killed.";
+  private static String DIAGNOSTIC_AM_FAILED = "Application Master failed";
+  private static String DIAGNOSTIC_AM_LAUNCH_FAILED = "Application Master failed to launch";
+
+  private final int amMaxRetries;
+  private final AMLivelinessMonitor amLivelinessMonitor;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private int numFailed = 0;
+  private final ApplicationStore appStore;
+  
+  /* the list of nodes that this AM was launched on */
+  List<String> hostNamesLaunched = new ArrayList<String>();
+  /* this transition is too generalized, needs to be broken up as and when we 
+   * keeping adding states. This will keep evolving and is not final yet.
+   */
+  private final  KillTransition killTransition =  new KillTransition();
+  private final StatusUpdateTransition statusUpdatetransition = new StatusUpdateTransition();
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private final ExpireTransition expireTransition = new ExpireTransition();
+  private final FailedTransition failedTransition = new FailedTransition();
+  private final AllocateTransition allocateTransition = new AllocateTransition();
+  private final LaunchTransition launchTransition =  new LaunchTransition();
+  private final LaunchedTransition launchedTransition = new LaunchedTransition();
+  private final FailedLaunchTransition failedLaunchTransition = new FailedLaunchTransition();
+  
+  private final StateMachine<ApplicationState,
+                ApplicationEventType, ApplicationEvent> stateMachine;
+
+  private final StateMachineFactory<ApplicationImpl, ApplicationState,
+    ApplicationEventType, ApplicationEvent> stateMachineFactory
+          = new StateMachineFactory<ApplicationImpl, ApplicationState,
+    ApplicationEventType, ApplicationEvent>(ApplicationState.PENDING)
+
+  // Transitions from PENDING State
+  .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
+      ApplicationEventType.ALLOCATE, allocateTransition)
+  .addTransition(ApplicationState.PENDING, ApplicationState.FAILED,
+      ApplicationEventType.FAILED)
+  .addTransition(ApplicationState.PENDING, ApplicationState.KILLED,
+      ApplicationEventType.KILL)
+  .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
+      ApplicationEventType.RECOVER, allocateTransition)
+  .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
+      ApplicationEventType.RELEASED, new ScheduleTransition())
+
+   // Transitions from ALLOCATING State
+  .addTransition(ApplicationState.ALLOCATING, ApplicationState.ALLOCATED,
+      ApplicationEventType.ALLOCATED, new AllocatedTransition())
+  .addTransition(ApplicationState.ALLOCATING,
+      ApplicationState.ALLOCATING, ApplicationEventType.RECOVER,
+      allocateTransition)
+  .addTransition(ApplicationState.ALLOCATING, ApplicationState.KILLED,
+      ApplicationEventType.KILL, new AllocatingKillTransition())
+
+  // Transitions from ALLOCATED State
+  .addTransition(ApplicationState.ALLOCATED, ApplicationState.KILLED,
+      ApplicationEventType.KILL, killTransition)
+  .addTransition(ApplicationState.ALLOCATED, ApplicationState.LAUNCHING,
+      ApplicationEventType.LAUNCH, launchTransition)
+  .addTransition(ApplicationState.ALLOCATED, ApplicationState.LAUNCHING,
+      ApplicationEventType.RECOVER, new RecoverLaunchTransition())
+
+  // Transitions from LAUNCHING State
+  .addTransition(ApplicationState.LAUNCHING, ApplicationState.LAUNCHED,
+      ApplicationEventType.LAUNCHED, launchedTransition)
+  .addTransition(ApplicationState.LAUNCHING, ApplicationState.PENDING,
+      ApplicationEventType.LAUNCH_FAILED, failedLaunchTransition)
+  // We cant say if the application was launched or not on a recovery, so
+  // for now we assume it was launched and wait for its restart.
+  .addTransition(ApplicationState.LAUNCHING, ApplicationState.LAUNCHED,
+      ApplicationEventType.RECOVER, new RecoverLaunchedTransition())
+  .addTransition(ApplicationState.LAUNCHING, ApplicationState.KILLED,
+      ApplicationEventType.KILL, killTransition)
+
+  // Transitions from LAUNCHED State
+  .addTransition(ApplicationState.LAUNCHED, ApplicationState.CLEANUP,
+      ApplicationEventType.KILL, killTransition)
+   .addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING,
+      ApplicationEventType.REGISTERED, new RegisterTransition())
+  .addTransition(ApplicationState.LAUNCHED, ApplicationState.LAUNCHED,
+      ApplicationEventType.RECOVER)
+  // for now we assume that acting on expiry is synchronous and we do not
+  // have to wait for cleanup acks from scheduler negotiator and launcher.
+  .addTransition(ApplicationState.LAUNCHED,
+      ApplicationState.EXPIRED_PENDING, ApplicationEventType.EXPIRE,
+      expireTransition)
+
+  // Transitions from RUNNING State
+  .addTransition(ApplicationState.RUNNING,
+      ApplicationState.EXPIRED_PENDING, ApplicationEventType.EXPIRE,
+      expireTransition)
+  .addTransition(ApplicationState.RUNNING,
+      EnumSet.of(ApplicationState.COMPLETED, ApplicationState.FAILED),
+      ApplicationEventType.FINISH, new DoneTransition())
+      // TODO: For now, no KILLED above. As all kills come to RM directly.
+  .addTransition(ApplicationState.RUNNING, ApplicationState.RUNNING,
+      ApplicationEventType.STATUSUPDATE, statusUpdatetransition)
+  .addTransition(ApplicationState.RUNNING, ApplicationState.KILLED,
+      ApplicationEventType.KILL, killTransition)
+  .addTransition(ApplicationState.RUNNING, ApplicationState.RUNNING, 
+      ApplicationEventType.RECOVER, new RecoverRunningTransition())
+
+  // Transitions from EXPIRED_PENDING State
+  .addTransition(ApplicationState.EXPIRED_PENDING,
+      ApplicationState.ALLOCATING, ApplicationEventType.ALLOCATE,
+      allocateTransition)
+  .addTransition(ApplicationState.EXPIRED_PENDING,
+      ApplicationState.ALLOCATING, ApplicationEventType.RECOVER,
+      allocateTransition)
+   .addTransition(ApplicationState.EXPIRED_PENDING,
+      ApplicationState.FAILED, ApplicationEventType.FAILED_MAX_RETRIES,
+      failedTransition)
+   .addTransition(ApplicationState.EXPIRED_PENDING,
+      ApplicationState.KILLED, ApplicationEventType.KILL, killTransition)
+
+  // Transitions from COMPLETED State
+  .addTransition(ApplicationState.COMPLETED, ApplicationState.COMPLETED,
+      EnumSet.of(ApplicationEventType.FINISH, ApplicationEventType.KILL,
+          ApplicationEventType.RECOVER))
+
+  // Transitions from FAILED State
+  .addTransition(ApplicationState.FAILED, ApplicationState.FAILED,
+      EnumSet.of(ApplicationEventType.RECOVER, 
+           ApplicationEventType.FINISH,
+           ApplicationEventType.KILL))
+
+  // Transitions from KILLED State
+  .addTransition(ApplicationState.KILLED, ApplicationState.KILLED, 
+      EnumSet.of(ApplicationEventType.RECOVER,
+           ApplicationEventType.KILL,
+           ApplicationEventType.FINISH))
+
+  .installTopology();
+
+
+
+  public ApplicationImpl(RMContext context, Configuration conf,
+      String user, ApplicationSubmissionContext submissionContext,
+      String clientToken, ApplicationStore appStore,
+      AMLivelinessMonitor amLivelinessMonitor) {
+    this.user = user;
+    this.handler = context.getDispatcher().getEventHandler();
+    this.syncHandler = context.getDispatcher().getSyncHandler();
+    this.submissionContext = submissionContext;
+    master = recordFactory.newRecordInstance(ApplicationMaster.class);
+    master.setApplicationId(submissionContext.getApplicationId());
+    master.setStatus(recordFactory.newRecordInstance(ApplicationStatus.class));
+    master.getStatus().setApplicationId(submissionContext.getApplicationId());
+    master.getStatus().setProgress(-1.0f);
+    master.setAMFailCount(0);
+    master.setContainerCount(0);
+    stateMachine = stateMachineFactory.make(this);
+    master.setState(ApplicationState.PENDING);
+    master.setClientToken(clientToken);
+    master.setDiagnostics("");
+    this.appStore = appStore;
+    this.startTime = System.currentTimeMillis();
+    this.amMaxRetries =  conf.getInt(RMConfig.AM_MAX_RETRIES, 
+        RMConfig.DEFAULT_AM_MAX_RETRIES);
+    LOG.info("AM max retries: " + this.amMaxRetries);
+    this.amLivelinessMonitor = amLivelinessMonitor;
+
+    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+  }
+
+  @Override
+  public ApplicationSubmissionContext getSubmissionContext() {
+    return submissionContext;
+  }
+
+  @Override
+  public Resource getResource() {
+    return submissionContext.getMasterCapability();
+  }
+
+  @Override
+  public synchronized ApplicationId getApplicationID() {
+    return this.master.getApplicationId();
+  }
+
+  @Override
+  public synchronized ApplicationStatus getStatus() {
+    return master.getStatus();
+  }
+
+  @Override
+  public synchronized ApplicationMaster getMaster() {
+    return master;
+  }
+
+  @Override
+  /* make sure the master state is in sync with statemachine state */
+  public synchronized ApplicationState getState() {
+    return master.getState();
+  }
+  
+  @Override
+  public synchronized long getStartTime() {
+    return this.startTime;
+  }
+  
+  @Override
+  public synchronized long getFinishTime() {
+    return this.finishTime;
+  }
+  
+  @Override
+  public synchronized Container getMasterContainer() {
+    return masterContainer;
+  }
+
+
+  @Override
+  public String getUser() {
+    return this.user;
+  }
+
+
+  @Override
+  public synchronized int getFailedCount() {
+    return numFailed;
+  }
+  
+  @Override
+  public String getName() {
+    return submissionContext.getApplicationName();
+  }
+
+  @Override
+  public String getQueue() {
+    return submissionContext.getQueue();
+  }
+  
+  @Override
+  public ApplicationStore getStore() {
+    return this.appStore;
+  }
+  
+  /* the applicaiton master completed successfully */
+  private static class DoneTransition
+      implements
+      MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
+
+    @Override
+    public ApplicationState transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.handler.handle(new ASMEvent<SNEventType>(
+        SNEventType.RELEASE, application));
+      application.handler.handle(new ASMEvent<AMLauncherEventType>(
+        AMLauncherEventType.CLEANUP, application));
+      application.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
+      ApplicationTrackerEventType.REMOVE, application));
+      application.finishTime = System.currentTimeMillis();
+
+      application.amLivelinessMonitor.unRegister(event.getApplicationId());
+
+      AMFinishEvent finishEvent = (AMFinishEvent) event;
+      application.master.setTrackingUrl(finishEvent.getTrackingUrl());
+      application.master.setDiagnostics(finishEvent.getDiagnostics());
+      return finishEvent.getFinalApplicationState();
+    }
+  }
+  
+  private static class AllocatingKillTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.handler.handle(new ASMEvent<ApplicationTrackerEventType>(ApplicationTrackerEventType.REMOVE,
+          application));
+    }
+  }
+  
+  private static class KillTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.finishTime = System.currentTimeMillis();
+      application.getMaster().setDiagnostics(DIAGNOSTIC_KILL_APPLICATION);
+      application.handler.handle(new ASMEvent<SNEventType>(SNEventType.RELEASE, application));
+      application.handler.handle(new ASMEvent<AMLauncherEventType>(AMLauncherEventType.CLEANUP, application));
+      application.handler.handle(new ASMEvent<ApplicationTrackerEventType>(ApplicationTrackerEventType.REMOVE,
+          application));
+    }
+  }
+
+  private static class RecoverLaunchTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, application));
+        
+      application.handler.handle(new ASMEvent<AMLauncherEventType>(
+          AMLauncherEventType.LAUNCH, application));
+    }
+  }
+  
+  private static class FailedLaunchTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.finishTime = System.currentTimeMillis();
+      application.getMaster().setDiagnostics(DIAGNOSTIC_AM_LAUNCH_FAILED);
+      application.handler.handle(new ASMEvent<SNEventType>(
+      SNEventType.RELEASE, application));
+    }
+  }
+  
+  private static class LaunchTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.handler.handle(new ASMEvent<AMLauncherEventType>(
+      AMLauncherEventType.LAUNCH, application));
+    }
+  }
+  
+  private static class RecoverRunningTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, application));
+      /* make sure the time stamp is update else expiry thread will expire this */
+      application.amLivelinessMonitor.receivedPing(event.getApplicationId());
+    }
+  }
+  
+  private static class RecoverLaunchedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, application));
+        
+      application.amLivelinessMonitor.register(event.getApplicationId());
+    }
+  }
+
+
+  private static class LaunchedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.amLivelinessMonitor.register(event.getApplicationId());
+    }
+  }
+
+  private static class ExpireTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      /* for now this is the same as killed transition but will change later */
+      application.handler.handle(new ASMEvent<SNEventType>(SNEventType.RELEASE,
+        application));
+      application.handler.handle(new ASMEvent<AMLauncherEventType>(
+        AMLauncherEventType.CLEANUP, application));
+      application.handler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.EXPIRE, application));
+      application.numFailed++;
+
+      /* check to see if the number of retries are reached or not */
+      if (application.getFailedCount() < application.amMaxRetries) {
+        application.handler.handle(new ApplicationEvent(
+            ApplicationEventType.ALLOCATE, event.getApplicationId()));
+      } else {
+        application.handler.handle(new ApplicationEvent(
+            ApplicationEventType.FAILED_MAX_RETRIES, application
+                .getApplicationID()));
+      }
+    }
+  }
+
+
+  /* Transition to schedule again on a container launch failure for AM */
+  private static class ScheduleTransition implements 
+  SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      application.masterContainer = null;
+      /* schedule for a slot */
+      application.handler.handle(new ASMEvent<SNEventType>(SNEventType.SCHEDULE,
+      application));
+    }
+  }
+  
+  /* Transition to start the process of allocating for the AM container */
+  private static class AllocateTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      /* notify tracking applications that an applicaiton has been added */
+      // TODO: For now, changing to synchHandler. Instead we should use register/deregister.
+      application.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+        ApplicationTrackerEventType.ADD, application));
+      
+      /* schedule for a slot */
+      application.handler.handle(new ASMEvent<SNEventType>(
+          SNEventType.SCHEDULE, application));
+    }
+  }
+  
+  /* Transition on a container allocated for a container */
+  private static class AllocatedTransition
+      implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      /* set the container that was generated by the scheduler negotiator */
+      AMAllocatedEvent allocatedEvent = 
+         (AMAllocatedEvent) event;
+      application.masterContainer = allocatedEvent.getMasterContainer();
+      try {
+        application.appStore.storeMasterContainer(application.masterContainer);
+      } catch(IOException ie) {
+        //TODO ignore for now fix later.
+      }
+
+      /* we need to launch the applicaiton master on allocated transition */
+      application.handler.handle(new ApplicationEvent(
+          ApplicationEventType.LAUNCH, application.getApplicationID()));
+    }    
+  }
+
+  private static class RegisterTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      AMRegistrationEvent registrationEvent =
+        (AMRegistrationEvent) event;
+      ApplicationMaster registeredMaster = registrationEvent
+          .getApplicationMaster();
+      application.master.setHost(registeredMaster.getHost());
+      application.master.setTrackingUrl(registeredMaster.getTrackingUrl());
+      application.master.setRpcPort(registeredMaster.getRpcPort());
+      application.master.setStatus(registeredMaster.getStatus());
+      application.master.getStatus().setProgress(0.0f);
+      application.amLivelinessMonitor.receivedPing(event.getApplicationId());
+      try {
+        application.appStore.updateApplicationState(application.master);
+      } catch(IOException ie) {
+        //TODO fix this later. on error we should exit
+      }
+    }
+  }
+
+  /* transition to finishing state on a cleanup, for now its not used, but will need it 
+   * later */
+  private static class FailedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      LOG.info("Failed application: " + application.getApplicationID());
+    } 
+  }
+
+
+  /* Just a status update transition */
+  private static class StatusUpdateTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+
+    @Override
+    public void transition(ApplicationImpl application,
+        ApplicationEvent event) {
+      AMStatusUpdateEvent statusUpdateEvent = 
+        (AMStatusUpdateEvent) event;
+      application.master.setStatus(statusUpdateEvent.getApplicationStatus());
+      application.amLivelinessMonitor.receivedPing(event.getApplicationId());
+    }
+  }
+
+  @Override
+  public synchronized void handle(ApplicationEvent event) {
+
+    this.writeLock.lock();
+
+    try {
+      ApplicationId appID = event.getApplicationId();
+      LOG.info("Processing event for " + appID + " of type "
+          + event.getType());
+      final ApplicationState oldState = getState();
+      try {
+        /* keep the master in sync with the state machine */
+        stateMachine.doTransition(event.getType(), event);
+        master.setState(stateMachine.getCurrentState());
+        LOG.info("State is " + stateMachine.getCurrentState());
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        /* TODO fail the application on the failed transition */
+      }
+      try {
+        appStore.updateApplicationState(master);
+      } catch (IOException ie) {
+        // TODO ignore for now
+      }
+      if (oldState != getState()) {
+        LOG.info(appID + " State change from " + oldState + " to "
+            + getState());
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+}

Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java?rev=1143250&r1=1143249&r2=1143250&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterLauncher.java Wed Jul  6 04:51:46 2011
@@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 
-class ApplicationMasterLauncher extends AbstractService implements EventHandler<ASMEvent<AMLauncherEventType>> {
+public class ApplicationMasterLauncher extends AbstractService implements EventHandler<ASMEvent<AMLauncherEventType>> {
   private static final Log LOG = LogFactory.getLog(
       ApplicationMasterLauncher.class);
   private final ThreadPoolExecutor launcherPool;
@@ -67,14 +67,14 @@ class ApplicationMasterLauncher extends 
     super.start();
   }
   
-  protected Runnable createRunnableLauncher(AppContext masterInfo, AMLauncherEventType event) {
-    Runnable launcher = new AMLauncher(context, masterInfo, event,
+  protected Runnable createRunnableLauncher(Application application, AMLauncherEventType event) {
+    Runnable launcher = new AMLauncher(context, application, event,
         applicationTokenSecretManager, clientToAMSecretManager, getConfig());
     return launcher;
   }
   
-  private void launch(AppContext appContext) {
-    Runnable launcher = createRunnableLauncher(appContext, AMLauncherEventType.LAUNCH);
+  private void launch(Application application) {
+    Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH);
     masterEvents.add(launcher);
   }
   
@@ -106,21 +106,21 @@ class ApplicationMasterLauncher extends 
     }
   }    
 
-  private void cleanup(AppContext appContext) {
-    Runnable launcher = createRunnableLauncher(appContext, AMLauncherEventType.CLEANUP);
+  private void cleanup(Application application) {
+    Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP);
     masterEvents.add(launcher);
   } 
   
   @Override
   public synchronized void  handle(ASMEvent<AMLauncherEventType> appEvent) {
     AMLauncherEventType event = appEvent.getType();
-    AppContext appContext = appEvent.getAppContext();
+    Application application = appEvent.getApplication();
     switch (event) {
     case LAUNCH:
-      launch(appContext);
+      launch(application);
       break;
     case CLEANUP:
-      cleanup(appContext);
+      cleanup(application);
     default:
       break;
     }



Mime
View raw message