hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1098735 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resou...
Date Mon, 02 May 2011 19:01:07 GMT
Author: mahadev
Date: Mon May  2 19:01:06 2011
New Revision: 1098735

URL: http://svn.apache.org/viewvc?rev=1098735&view=rev
Log:
Completing RM Restart. Completed Phase 3 of making sure events are logged and restored (mahadev)

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationsStore.java
Removed:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationStore.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestASMStateMachine.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationMasterLauncher.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May  2 19:01:06 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    Completing RM Restart. Completed Phase 3 of making sure events are logged and restored (mahadev)
+
     Moving userlogs out of container work-dir into a separate directory structure. (vinodkv)
 
     Implement Job Acls in MR Application Master. (sharad)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon May  2 19:01:06 2011
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
@@ -245,7 +246,7 @@ public class TestRMContainerAllocator {
       fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
       fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
           recordFactory.newRecordInstance(ApplicationMaster.class),
-          "test", null, null);
+          "test", null, null, StoreFactory.createVoidAppStore());
     } catch(IOException ie) {
       LOG.info("add application failed with ", ie);
       assert(false);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Mon May  2 19:01:06 2011
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.
   ApplicationMasterEvents.ApplicationTrackerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@@ -74,7 +74,7 @@ public class ResourceManager extends Com
   private AdminService adminService;
   private AtomicBoolean shutdown = new AtomicBoolean(false);
   private WebApp webApp;
-  private RMContext asmContext;
+  private RMContext rmContext;
   private final Store store;
   
   public ResourceManager(Store store) {
@@ -83,13 +83,13 @@ public class ResourceManager extends Com
   }
   
   public RMContext getRMContext() {
-    return this.asmContext;
+    return this.rmContext;
   }
   
   public interface RMContext {
     public RMDispatcherImpl getDispatcher();
     public NodeStore getNodeStore();
-    public ApplicationStore getApplicationStore();
+    public ApplicationsStore getApplicationsStore();
   }
   
   public static class RMContextImpl implements RMContext {
@@ -112,19 +112,17 @@ public class ResourceManager extends Com
     }
 
     @Override
-    public ApplicationStore getApplicationStore() {
+    public ApplicationsStore getApplicationsStore() {
       return store;
     }
   }
   
-  public void recover() {
-    
-  }
+  
   @Override
   public synchronized void init(Configuration conf) {
     
-    this.asmContext = new RMContextImpl(this.store);
-    addService(asmContext.getDispatcher());
+    this.rmContext = new RMContextImpl(this.store);
+    addService(rmContext.getDispatcher());
     // Initialize the config
     this.conf = new YarnConfiguration(conf);
     // Initialize the scheduler
@@ -134,7 +132,7 @@ public class ResourceManager extends Com
               FifoScheduler.class, ResourceScheduler.class), 
           this.conf);
   
-    this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
+    this.rmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
         .createSecretKey("Dummy".getBytes()));
@@ -208,12 +206,12 @@ public class ResourceManager extends Com
   }
   
   protected RMResourceTrackerImpl createRMResourceTracker() {
-    return new RMResourceTrackerImpl(this.containerTokenSecretManager);
+    return new RMResourceTrackerImpl(this.containerTokenSecretManager, this.rmContext);
   }
   
   protected ApplicationsManagerImpl createApplicationsManagerImpl() {
     return new ApplicationsManagerImpl(
-        this.appTokenSecretManager, this.scheduler, this.asmContext);
+        this.appTokenSecretManager, this.scheduler, this.rmContext);
   }
 
   protected ClientRMService createClientRMService() {
@@ -222,7 +220,7 @@ public class ResourceManager extends Com
 
   protected ApplicationMasterService createApplicationMasterService() {
     return new ApplicationMasterService(
-      this.appTokenSecretManager, applicationsManager, scheduler, this.asmContext);
+      this.appTokenSecretManager, applicationsManager, scheduler, this.rmContext);
   }
   
 
@@ -270,7 +268,8 @@ public class ResourceManager extends Com
       Store store =  StoreFactory.getStore(conf);
       resourceManager = new ResourceManager(store);
       resourceManager.init(conf);
-      //resourceManager.recover();
+      //resourceManager.recover(store.restore());
+      //store.doneWithRecovery();
       resourceManager.start();
     } catch (Throwable e) {
       LOG.error("Error starting RM", e);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Mon May  2 19:01:06 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
@@ -67,11 +70,13 @@ public class AMTracker extends AbstractS
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private int amMaxRetries;
 
-  private final RMContext asmContext;
+  private final RMContext rmContext;
 
   private final Map<ApplicationId, ApplicationMasterInfo> applications = 
     new ConcurrentHashMap<ApplicationId, ApplicationMasterInfo>();
 
+  private final ApplicationsStore appsStore;
+  
   private TreeSet<ApplicationStatus> amExpiryQueue =
     new TreeSet<ApplicationStatus>(
         new Comparator<ApplicationStatus>() {
@@ -88,23 +93,24 @@ public class AMTracker extends AbstractS
         }
     );
 
-  public AMTracker(RMContext asmContext) {
+  public AMTracker(RMContext rmContext) {
     super(AMTracker.class.getName());
     this.heartBeatThread = new HeartBeatThread();
-    this.asmContext = asmContext;
+    this.rmContext = rmContext;
+    this.appsStore = rmContext.getApplicationsStore();
   }
 
   @Override
   public void init(Configuration conf) {
     super.init(conf);
-    this.handler = asmContext.getDispatcher().getEventHandler();
+    this.handler = rmContext.getDispatcher().getEventHandler();
     this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 
         YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
     LOG.info("AM expiry interval: " + this.amExpiryInterval);
     this.amMaxRetries =  conf.getInt(YarnConfiguration.AM_MAX_RETRIES, 
         YarnConfiguration.DEFAULT_AM_MAX_RETRIES);
     LOG.info("AM max retries: " + this.amMaxRetries);
-    this.asmContext.getDispatcher().register(ApplicationEventType.class, this);
+    this.rmContext.getDispatcher().register(ApplicationEventType.class, this);
   }
 
   @Override
@@ -190,14 +196,18 @@ public class AMTracker extends AbstractS
   }
 
   public void addMaster(String user,  ApplicationSubmissionContext 
-      submissionContext, String clientToken) {
-    ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(asmContext, 
-        user, submissionContext, clientToken);
+      submissionContext, String clientToken) throws IOException {
+    
+    ApplicationStore appStore = appsStore.createApplicationStore(submissionContext.getApplicationId(),
+        submissionContext);
+    ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(rmContext, 
+        user, submissionContext, clientToken, appStore);
     synchronized(applications) {
       applications.put(applicationMaster.getApplicationID(), applicationMaster);
     }
-    asmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
+    rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
         ApplicationEventType.ALLOCATE, applicationMaster));
+   
   }
 
   public void finish(ApplicationId application) {
@@ -209,7 +219,7 @@ public class AMTracker extends AbstractS
       LOG.info("Cant find application to finish " + application);
       return;
     }
-    asmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
+    rmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
         ApplicationEventType.FINISH, masterInfo));
   }
 
@@ -317,6 +327,11 @@ public class AMTracker extends AbstractS
     public int getFailedCount() {
       throw notimplemented;
     }
+
+    @Override
+    public ApplicationStore getStore() {
+     throw notimplemented;
+    }
   }
 
   public void heartBeat(ApplicationStatus status) {
@@ -386,9 +401,16 @@ public class AMTracker extends AbstractS
     for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
       ApplicationId appId = entry.getKey();
       ApplicationInfo appInfo = entry.getValue();
-      ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.asmContext,
+      ApplicationMasterInfo masterInfo = null;
+      try {
+        masterInfo = new ApplicationMasterInfo(this.rmContext,
+      
           appInfo.getApplicationSubmissionContext().getUser(), appInfo.getApplicationSubmissionContext(), 
-          appInfo.getApplicationMaster().getClientToken());
+          appInfo.getApplicationMaster().getClientToken(), 
+          this.appsStore.createApplicationStore(appId, appInfo.getApplicationSubmissionContext()));
+      } catch(IOException ie) {
+        //ignore
+      }
       ApplicationMaster master = masterInfo.getMaster();
       ApplicationMaster storedAppMaster = appInfo.getApplicationMaster();
       master.setAMFailCount(storedAppMaster.getAMFailCount());
@@ -401,33 +423,7 @@ public class AMTracker extends AbstractS
       master.setStatus(storedAppMaster.getStatus());
       master.setState(storedAppMaster.getState());
       applications.put(appId, masterInfo);
-      
-      switch(master.getState()) {
-      case ALLOCATED:
-        break;
-      case ALLOCATING:
-        break;
-      case CLEANUP:
-        break;
-      case EXPIRED_PENDING:
-        break;
-      case COMPLETED:
-        break;
-      case FAILED:
-        break;
-      case LAUNCHED:
-        break;
-      case KILLED:
-        break;
-      case LAUNCHING:
-        break;
-      case PAUSED:
-        break;
-      case PENDING:
-        break;
-      case RUNNING:
-        break;
-      }
+      handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.RECOVER, masterInfo));
     }
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Mon May  2 19:01:06 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 
 /** 
  * The context of an application. 
@@ -97,4 +98,10 @@ public interface AppContext {
    * @return the count of number of times the AM has expired/failed.
    */
   public int getFailedCount();
+  
+  /**
+   * The store for this application
+   * @return the application store for this application
+   */
+  public ApplicationStore getStore();
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterInfo.java Mon May  2 19:01:06 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -63,6 +65,8 @@ public class ApplicationMasterInfo imple
   private Container masterContainer;
   final private String user;
   private int numFailed = 0;
+  private final ApplicationStore appStore;
+  
   /* the list of nodes that this AM was launched on */
   List<String> hostNamesLaunched = new ArrayList<String>();
   /* this transition is too generalized, needs to be broken up as and when we 
@@ -74,13 +78,15 @@ public class ApplicationMasterInfo imple
   private final ExpireTransition expireTransition = new ExpireTransition();
   private final FailedTransition failedTransition = new FailedTransition();
   private final AllocateTransition allocateTransition = new AllocateTransition();
+  private final LaunchTransition launchTransition =  new LaunchTransition();
+  private final LaunchedTransition launchedTransition = new LaunchedTransition();
   
   private final StateMachine<ApplicationState, ApplicationEventType, 
   ASMEvent<ApplicationEventType>> stateMachine;
 
   private final StateMachineFactory<ApplicationMasterInfo,
   ApplicationState, ApplicationEventType, ASMEvent<ApplicationEventType>> stateMachineFactory 
-
+  
   = new StateMachineFactory
   <ApplicationMasterInfo, ApplicationState, ApplicationEventType, ASMEvent<ApplicationEventType>>
   (ApplicationState.PENDING)
@@ -88,15 +94,27 @@ public class ApplicationMasterInfo imple
   .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING,
   ApplicationEventType.ALLOCATE, allocateTransition)
   
+  .addTransition(ApplicationState.PENDING, ApplicationState.ALLOCATING, 
+      ApplicationEventType.RECOVER, allocateTransition)
+  
   .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.ALLOCATING, 
   ApplicationEventType.ALLOCATE, allocateTransition)
   
+  .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.ALLOCATING,
+  ApplicationEventType.RECOVER, allocateTransition)
+  
+  .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.FAILED,
+  ApplicationEventType.FAILED_MAX_RETRIES, failedTransition)
+  
   .addTransition(ApplicationState.PENDING, ApplicationState.CLEANUP, 
   ApplicationEventType.KILL, killTransition)
 
   .addTransition(ApplicationState.ALLOCATING, ApplicationState.ALLOCATED,
   ApplicationEventType.ALLOCATED, new AllocatedTransition())
 
+  .addTransition(ApplicationState.ALLOCATING, ApplicationState.ALLOCATING,
+  ApplicationEventType.RECOVER, allocateTransition)
+      
   .addTransition(ApplicationState.ALLOCATING, ApplicationState.CLEANUP, 
   ApplicationEventType.KILL, killTransition)
 
@@ -104,10 +122,19 @@ public class ApplicationMasterInfo imple
   ApplicationEventType.KILL, killTransition)
 
   .addTransition(ApplicationState.ALLOCATED, ApplicationState.LAUNCHING,
-  ApplicationEventType.LAUNCH, new LaunchTransition())
+  ApplicationEventType.LAUNCH, launchTransition)
 
+  .addTransition(ApplicationState.ALLOCATED, ApplicationState.LAUNCHING,
+  ApplicationEventType.RECOVER, new RecoverLaunchTransition())
+      
   .addTransition(ApplicationState.LAUNCHING, ApplicationState.LAUNCHED,
-  ApplicationEventType.LAUNCHED, new LaunchedTransition())
+  ApplicationEventType.LAUNCHED, launchedTransition)
+  
+  /** we cant say if the application was launched or not on a recovery, so for now 
+   * we assume it was launched and wait for its restart.
+   */
+  .addTransition(ApplicationState.LAUNCHING, ApplicationState.LAUNCHED,
+  ApplicationEventType.RECOVER, new RecoverLaunchedTransition())
   
   .addTransition(ApplicationState.LAUNCHING, ApplicationState.KILLED,
    ApplicationEventType.KILL, killTransition)
@@ -120,7 +147,10 @@ public class ApplicationMasterInfo imple
   
   .addTransition(ApplicationState.LAUNCHED, ApplicationState.RUNNING, 
   ApplicationEventType.REGISTERED, new RegisterTransition())
-  
+    
+  .addTransition(ApplicationState.LAUNCHED, ApplicationState.LAUNCHED,
+   ApplicationEventType.RECOVER)
+
   /* for now we assume that acting on expiry is synchronous and we do not 
    * have to wait for cleanup acks from scheduler negotiator and launcher.
    */
@@ -130,24 +160,33 @@ public class ApplicationMasterInfo imple
   .addTransition(ApplicationState.RUNNING,  ApplicationState.EXPIRED_PENDING, 
   ApplicationEventType.EXPIRE, expireTransition)
   
-  .addTransition(ApplicationState.EXPIRED_PENDING, ApplicationState.FAILED,
-      ApplicationEventType.FAILED_MAX_RETRIES, failedTransition)
-      
   .addTransition(ApplicationState.RUNNING, ApplicationState.COMPLETED,
   ApplicationEventType.FINISH, new DoneTransition())
 
   .addTransition(ApplicationState.RUNNING, ApplicationState.RUNNING,
   ApplicationEventType.STATUSUPDATE, statusUpdatetransition)
 
+  .addTransition(ApplicationState.RUNNING, ApplicationState.RUNNING, 
+  ApplicationEventType.RECOVER, new RecoverRunningTransition())
+  
   .addTransition(ApplicationState.COMPLETED, ApplicationState.COMPLETED, 
   ApplicationEventType.EXPIRE)
 
+  .addTransition(ApplicationState.COMPLETED, ApplicationState.COMPLETED,
+  ApplicationEventType.RECOVER)
+  
+  .addTransition(ApplicationState.FAILED, ApplicationState.FAILED,
+      ApplicationEventType.RECOVER)
+  
+  .addTransition(ApplicationState.KILLED, ApplicationState.KILLED, 
+      ApplicationEventType.RECOVER)
+
   .installTopology();
 
 
 
   public ApplicationMasterInfo(RMContext context, String user,
-  ApplicationSubmissionContext submissionContext, String clientToken) {
+  ApplicationSubmissionContext submissionContext, String clientToken, ApplicationStore appStore) {
     this.user = user;
     this.handler = context.getDispatcher().getEventHandler();
     this.syncHandler = context.getDispatcher().getSyncHandler();
@@ -162,6 +201,7 @@ public class ApplicationMasterInfo imple
     stateMachine = stateMachineFactory.make(this);
     master.setState(ApplicationState.PENDING);
     master.setClientToken(clientToken);
+    this.appStore = appStore;
   }
 
   @Override
@@ -229,6 +269,11 @@ public class ApplicationMasterInfo imple
     return submissionContext.getQueue();
   }
   
+  @Override
+  public ApplicationStore getStore() {
+    return this.appStore;
+  }
+  
   /* the applicaiton master completed successfully */
   private static class DoneTransition implements 
     SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
@@ -257,6 +302,20 @@ public class ApplicationMasterInfo imple
     }
   }
 
+  private static class RecoverLaunchTransition implements  SingleArcTransition
+  <ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+
+    @Override
+    public void transition(ApplicationMasterInfo masterInfo,
+        ASMEvent<ApplicationEventType> event) {
+      masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, masterInfo));
+        
+      masterInfo.handler.handle(new ASMEvent<AMLauncherEventType>(
+          AMLauncherEventType.LAUNCH, masterInfo));
+    }
+  }
+  
   private static class LaunchTransition implements
   SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
     @Override
@@ -266,6 +325,32 @@ public class ApplicationMasterInfo imple
       AMLauncherEventType.LAUNCH, masterInfo));
     }
   }
+  
+  private static class RecoverRunningTransition implements
+  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+    @Override
+    public void transition(ApplicationMasterInfo masterInfo,
+    ASMEvent<ApplicationEventType> event) {
+      masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, masterInfo));
+      /* make sure the time stamp is update else expiry thread will expire this */
+      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+    }
+  }
+  
+  private static class RecoverLaunchedTransition implements
+  SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
+    @Override
+    public void transition(ApplicationMasterInfo masterInfo,
+    ASMEvent<ApplicationEventType> event) {
+      masterInfo.syncHandler.handle(new ASMEvent<ApplicationTrackerEventType>(
+          ApplicationTrackerEventType.ADD, masterInfo));
+        
+      /* make sure the time stamp is update else expiry thread will expire this */
+      masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+    }
+  }
+
 
   private static class LaunchedTransition implements
   SingleArcTransition<ApplicationMasterInfo, ASMEvent<ApplicationEventType>> {
@@ -319,6 +404,11 @@ public class ApplicationMasterInfo imple
     ASMEvent<ApplicationEventType> event) {
       /* set the container that was generated by the scheduler negotiator */
       masterInfo.masterContainer = event.getAppContext().getMasterContainer();
+      try {
+        masterInfo.appStore.storeMasterContainer(masterInfo.masterContainer);
+      } catch(IOException ie) {
+        //TODO ignore for now fix later.
+      }
     }    
   }
 
@@ -334,6 +424,11 @@ public class ApplicationMasterInfo imple
       masterInfo.master.setStatus(registeredMaster.getStatus());
       masterInfo.master.getStatus().setProgress(0.0f);
       masterInfo.master.getStatus().setLastSeen(System.currentTimeMillis());
+      try {
+        masterInfo.appStore.updateApplicationState(masterInfo.master);
+      } catch(IOException ie) {
+        //TODO fix this later. on error we should exit
+      }
     }
   }
 
@@ -376,6 +471,11 @@ public class ApplicationMasterInfo imple
       LOG.error("Can't handle this event at current state", e);
       /* TODO fail the application on the failed transition */
     }
+    try {
+      appStore.updateApplicationState(master);
+    } catch(IOException ie) {
+      //TODO ignore for now
+    }
     if (oldState != getState()) {
       LOG.info(appID + " State change from " 
       + oldState + " to "

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Mon May  2 19:01:06 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.CompositeService;
@@ -69,19 +70,19 @@ public class ApplicationsManagerImpl ext
     new ClientToAMSecretManager();
   private final EventHandler eventHandler;
   private final ApplicationTokenSecretManager applicationTokenSecretManager;
-  private final RMContext asmContext; 
+  private final RMContext rmContext; 
   
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
-
+  
   public ApplicationsManagerImpl(ApplicationTokenSecretManager 
-      applicationTokenSecretManager, YarnScheduler scheduler, RMContext asmContext) {
+      applicationTokenSecretManager, YarnScheduler scheduler, RMContext rmContext) {
     super("ApplicationsManager");
     this.scheduler = scheduler;
-    this.asmContext = asmContext;
-    this.eventHandler = this.asmContext.getDispatcher().getEventHandler();
+    this.rmContext = rmContext;
+    this.eventHandler = this.rmContext.getDispatcher().getEventHandler();
     this.applicationTokenSecretManager = applicationTokenSecretManager;
-  }
+   }
   
 
   /**
@@ -89,7 +90,7 @@ public class ApplicationsManagerImpl ext
    * @return create a new am heart beat handler.
    */
   protected AMTracker createNewAMTracker() {
-    return new AMTracker(this.asmContext);
+    return new AMTracker(this.rmContext);
   }
 
   /**
@@ -98,7 +99,7 @@ public class ApplicationsManagerImpl ext
    * @return scheduler negotiator that talks to the scheduler.
    */
   protected EventHandler<ASMEvent<SNEventType>> createNewSchedulerNegotiator(YarnScheduler scheduler) {
-    return new SchedulerNegotiator(this.asmContext, scheduler);
+    return new SchedulerNegotiator(this.rmContext, scheduler);
   }
 
   /**
@@ -110,7 +111,7 @@ public class ApplicationsManagerImpl ext
   protected EventHandler<ASMEvent<AMLauncherEventType>> createNewApplicationMasterLauncher(
       ApplicationTokenSecretManager tokenSecretManager) {
     return  new ApplicationMasterLauncher(tokenSecretManager,
-        this.clientToAMSecretManager, this.asmContext);
+        this.clientToAMSecretManager, this.rmContext);
   }
 
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/SchedulerNegotiator.java Mon May  2 19:01:06 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -289,5 +290,10 @@ class SchedulerNegotiator extends Abstra
     public int getFailedCount() {
       throw notImplementedException;
     }
+
+    @Override
+    public ApplicationStore getStore() {
+      throw notImplementedException;
+    }
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/events/ApplicationMasterEvents.java Mon May  2 19:01:06 2011
@@ -48,6 +48,7 @@ public class ApplicationMasterEvents {
   public enum ApplicationEventType {
     ALLOCATE,
     REGISTERED,
+    RECOVER,
     REMOVE,
     STATUSUPDATE,
     LAUNCH,

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationsStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationsStore.java?rev=1098735&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationsStore.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ApplicationsStore.java Mon May  2 19:01:06 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+
+public interface ApplicationsStore {
+  public ApplicationStore createApplicationStore(ApplicationId applicationId,
+      ApplicationSubmissionContext context) throws IOException;
+  public void removeApplication(ApplicationId application) throws IOException;
+ 
+  public interface ApplicationStore {
+    public void storeContainer(Container container) throws IOException;
+    public void removeContainer(Container container) throws IOException;
+    public void storeMasterContainer(Container container) throws IOException;
+    public void updateApplicationState(ApplicationMaster master) throws IOException;
+    public boolean isLoggable();
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Mon May  2 19:01:06 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.res
 public class MemStore implements Store {
   RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private NodeId nodeId;
+  private boolean doneWithRecovery = false;
 
   public MemStore() {
     nodeId = recordFactory.newRecordInstance(NodeId.class);
@@ -49,22 +50,36 @@ public class MemStore implements Store {
   @Override
   public void removeNode(NodeManager node) throws IOException {}
 
-  @Override
-  public void storeContainer(Container container) throws IOException {}
+  private class ApplicationStoreImpl implements ApplicationStore {
+    @Override
+    public void storeContainer(Container container) throws IOException {}
 
-  @Override
-  public void removeContainer(Container container) throws IOException {}
+    @Override
+    public void removeContainer(Container container) throws IOException {}
 
-  @Override
-  public void storeApplication(ApplicationId application,
-      ApplicationSubmissionContext context, ApplicationMaster master) throws IOException {}
+    @Override
+    public void storeMasterContainer(Container container) throws IOException {}
+
+    @Override
+    public void updateApplicationState(
+        ApplicationMaster master) throws IOException {}
+
+    @Override
+    public boolean isLoggable() {
+      return doneWithRecovery;
+    }
+
+  }
 
   @Override
-  public void removeApplication(ApplicationId application) throws IOException {}
+  public ApplicationStore createApplicationStore(ApplicationId application,
+      ApplicationSubmissionContext context) throws IOException {
+    return new ApplicationStoreImpl();
+  }
+
 
   @Override
-  public void updateApplicationState(ApplicationId applicationId,
-      ApplicationMaster master) throws IOException {}
+  public void removeApplication(ApplicationId application) throws IOException {}
 
   @Override
   public RMState restore() throws IOException {
@@ -101,4 +116,14 @@ public class MemStore implements Store {
       return new HashMap<ApplicationId, Store.ApplicationInfo>();
     }
   }
+
+  @Override
+  public boolean isLoggable() {
+    return doneWithRecovery;
+  }
+
+  @Override
+  public void doneWithRecovery() {
+    doneWithRecovery = true;
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NodeStore.java Mon May  2 19:01:06 2011
@@ -28,4 +28,5 @@ public interface NodeStore {
   public void storeNode(NodeManager node) throws IOException;
   public void removeNode(NodeManager node) throws IOException;
   public NodeId getNextNodeId() throws IOException;
+  public boolean isLoggable();
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Mon May  2 19:01:06 2011
@@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
 
-public interface Store extends NodeStore, ApplicationStore {
+public interface Store extends NodeStore, ApplicationsStore {
   public interface ApplicationInfo {
     public ApplicationMaster getApplicationMaster();
+    public Container getMasterContainer();
     public ApplicationSubmissionContext getApplicationSubmissionContext();
     public List<Container> getContainers();
   }
@@ -41,4 +42,5 @@ public interface Store extends NodeStore
     public NodeId getLastLoggedNodeId();
   }
   public RMState restore() throws IOException;
+  public void doneWithRecovery();
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFactory.java Mon May  2 19:01:06 2011
@@ -17,9 +17,14 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 
 public class StoreFactory {
   
@@ -30,4 +35,35 @@ public class StoreFactory {
             conf);
     return store;
   }
+  
+  public static ApplicationStore createVoidAppStore() {
+    return new VoidApplicationStore();
+  }
+  
+  private static class VoidApplicationStore implements ApplicationStore {
+
+    public VoidApplicationStore() {}
+    
+    @Override
+    public void storeContainer(Container container) throws IOException {
+    }
+
+    @Override
+    public void removeContainer(Container container) throws IOException {
+    }
+
+    @Override
+    public void storeMasterContainer(Container container) throws IOException {
+    }
+
+    @Override
+    public void updateApplicationState(ApplicationMaster master)
+        throws IOException {
+    }
+
+    @Override
+    public boolean isLoggable() {
+      return false;
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Mon May  2 19:01:06 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import java.io.IOException;
@@ -68,12 +68,14 @@ public class ZKStore implements Store {
   private static final String ZK_PATH_SEPARATOR = "/";
   private static final String NODE_ID = "nodeid";
   private static final String APP_MASTER = "master";
+  private static final String APP_MASTER_CONTAINER = "mastercontainer";
   private final String ZK_ADDRESS;
   private final int ZK_TIMEOUT;
+  private boolean doneWithRecovery = false;
   
   /** TODO make this generic **/
   private NodeIdPBImpl nodeId = new NodeIdPBImpl();
-  
+
   /**
    * TODO fix this for later to handle all kinds of events 
    * of connection and session events.
@@ -96,25 +98,26 @@ public class ZKStore implements Store {
     );
     this.nodeId.setId(0);
   }
-  
+
   protected Watcher createZKWatcher() {
     return new ZKWatcher();   
   }
-  
+
   private NodeManagerInfoPBImpl createNodeManagerInfo(NodeManager nodeInfo) {
     NodeManagerInfo node = 
       recordFactory.newRecordInstance(NodeManagerInfo.class);
-      node.setNodeAddress(nodeInfo.getNodeAddress());
-      node.setRackName(nodeInfo.getRackName());
-      node.setCapability(nodeInfo.getTotalCapability());
-      node.setUsed(nodeInfo.getUsedResource());
-      node.setNumContainers(nodeInfo.getNumContainers());
-      return (NodeManagerInfoPBImpl)node;
+    node.setNodeAddress(nodeInfo.getNodeAddress());
+    node.setRackName(nodeInfo.getRackName());
+    node.setCapability(nodeInfo.getTotalCapability());
+    node.setUsed(nodeInfo.getUsedResource());
+    node.setNumContainers(nodeInfo.getNumContainers());
+    return (NodeManagerInfoPBImpl)node;
   }
-  
+
   @Override
   public synchronized void storeNode(NodeManager node) throws IOException {
     /** create a storage node and store it in zk **/
+    if (!doneWithRecovery) return;
     NodeManagerInfoPBImpl nodeManagerInfo = createNodeManagerInfo(node);
     byte[] bytes = nodeManagerInfo.getProto().toByteArray();
     try {
@@ -131,6 +134,8 @@ public class ZKStore implements Store {
 
   @Override
   public synchronized void removeNode(NodeManager node) throws IOException {
+    if (!doneWithRecovery) return;
+    
     /** remove a storage node **/
     try {
       zkClient.delete(NODES + Integer.toString(node.getNodeID().getId()), -1);
@@ -141,9 +146,9 @@ public class ZKStore implements Store {
       LOG.info("Keeper exception", ke);
       throw convertToIOException(ke);
     }
-    
+
   }
-  
+
   private static IOException convertToIOException(KeeperException ke) {
     IOException io = new IOException();
     io.setStackTrace(ke.getStackTrace());
@@ -170,48 +175,105 @@ public class ZKStore implements Store {
     String appString = ConverterUtils.toString(containerId.getAppId());
     return appString + "/" + containerId.getId();
   }
-  
-  @Override
-  public synchronized void storeContainer(Container container) throws IOException {
-    ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
-    try {
-      zkClient.create(APPS + containerPathFromContainerId(container.getId())
-          , containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
-    } catch(InterruptedException ie) {
-      LOG.info("Interrupted", ie);
-      throw new InterruptedIOException(ie.getMessage());
-    } catch(KeeperException ke) {
-      LOG.info("Keeper exception", ke);
-      throw convertToIOException(ke);
+
+  private class ZKApplicationStore implements ApplicationStore {
+    private final ApplicationId applicationId;
+
+    public ZKApplicationStore(ApplicationId applicationId) {
+      this.applicationId = applicationId;
     }
-  }
 
-  @Override
-  public synchronized void removeContainer(Container container) throws IOException {
-    ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
-    try { 
-      zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
-          -1);
-    } catch(InterruptedException ie) {
-      throw new InterruptedIOException(ie.getMessage());
-    } catch(KeeperException ke) {
-      LOG.info("Keeper exception", ke);
-      throw convertToIOException(ke);
+    @Override
+    public void storeMasterContainer(Container container) throws IOException {
+      if (!doneWithRecovery) return;
+      
+      ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+      try {
+        zkClient.setData(APPS + ConverterUtils.toString(container.getId().getAppId()) +
+            ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER
+            , containerPBImpl.getProto().toByteArray(), -1);
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        LOG.info("Keeper exception", ke);
+        throw convertToIOException(ke);
+      }
+    }
+    @Override
+    public synchronized void storeContainer(Container container) throws IOException {
+      if (!doneWithRecovery) return;
+      
+      ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+      try {
+        zkClient.create(APPS + containerPathFromContainerId(container.getId())
+            , containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        LOG.info("Keeper exception", ke);
+        throw convertToIOException(ke);
+      }
+    }
+
+    @Override
+    public synchronized void removeContainer(Container container) throws IOException {
+      if (!doneWithRecovery) return;
+      
+      ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
+      try { 
+        zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
+            -1);
+      } catch(InterruptedException ie) {
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        LOG.info("Keeper exception", ke);
+        throw convertToIOException(ke);
+      }
+    }
+
+    @Override
+    public void updateApplicationState(
+        ApplicationMaster master) throws IOException {
+      if (!doneWithRecovery) return;
+      
+      String appString = APPS + ConverterUtils.toString(applicationId);
+      ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
+      try {
+        zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        LOG.info("Keeper exception", ke);
+        throw convertToIOException(ke);
+      }
+    }
+
+    @Override
+    public boolean isLoggable() {
+      return doneWithRecovery;
     }
   }
 
   @Override
-  public synchronized void storeApplication(ApplicationId application, ApplicationSubmissionContext 
-      context, ApplicationMaster master) throws IOException {
+  public synchronized ApplicationStore createApplicationStore(ApplicationId application, 
+      ApplicationSubmissionContext context) throws IOException {
+    if (!doneWithRecovery) return new ZKApplicationStore(application);
+    
     ApplicationSubmissionContextPBImpl contextPBImpl = (ApplicationSubmissionContextPBImpl) context;
     String appString = APPS + ConverterUtils.toString(application);
-    ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
-    
+   
+    ApplicationMasterPBImpl masterPBImpl = new ApplicationMasterPBImpl();
+    ContainerPBImpl container = new ContainerPBImpl();
     try {
       zkClient.create(appString, contextPBImpl.getProto()
           .toByteArray(), null, CreateMode.PERSISTENT);
       zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER, 
           masterPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
+      zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER, 
+          container.getProto().toByteArray(), null, CreateMode.PERSISTENT);
     } catch(InterruptedException ie) {
       LOG.info("Interrupted", ie);
       throw new InterruptedIOException(ie.getMessage());
@@ -219,27 +281,13 @@ public class ZKStore implements Store {
       LOG.info("Keeper exception", ke);
       throw convertToIOException(ke);
     }
-  }
-  
-  @Override
-  public void updateApplicationState(ApplicationId applicationId,
-      ApplicationMaster master) throws IOException {
-    String appString = APPS + ConverterUtils.toString(applicationId);
-    ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
-    try {
-      zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
-    } catch(InterruptedException ie) {
-      LOG.info("Interrupted", ie);
-      throw new InterruptedIOException(ie.getMessage());
-    } catch(KeeperException ke) {
-      LOG.info("Keeper exception", ke);
-      throw convertToIOException(ke);
-    }
+    return new ZKApplicationStore(application);
   }
 
-
   @Override
   public synchronized void removeApplication(ApplicationId application) throws IOException {
+    if (!doneWithRecovery) return;
+    
     try {
       zkClient.delete(APPS + ConverterUtils.toString(application), -1);
     } catch(InterruptedException ie) {
@@ -250,6 +298,17 @@ public class ZKStore implements Store {
       throw convertToIOException(ke);
     }
   }
+
+  @Override
+  public boolean isLoggable() {
+    return doneWithRecovery;
+  }
+
+  @Override
+  public void doneWithRecovery() {
+    this.doneWithRecovery = true;
+  }
+
   
   @Override
   public synchronized RMState restore() throws IOException {
@@ -257,20 +316,26 @@ public class ZKStore implements Store {
     rmState.load();
     return rmState;
   }  
-  
+
   private class ApplicationInfoImpl implements ApplicationInfo {
     private ApplicationMaster master;
+    private Container masterContainer;
+
     private final ApplicationSubmissionContext context;
     private final List<Container> containers = new ArrayList<Container>();
-    
+
     public ApplicationInfoImpl(ApplicationSubmissionContext context) {
       this.context = context;
     }
-    
+
     public void setApplicationMaster(ApplicationMaster master) {
       this.master = master;
     }
-    
+
+    public void setMasterContainer(Container container) {
+      this.masterContainer = container;
+    }
+
     @Override
     public ApplicationMaster getApplicationMaster() {
       return this.master;
@@ -282,24 +347,29 @@ public class ZKStore implements Store {
     }
 
     @Override
+    public Container getMasterContainer() {
+      return this.masterContainer;
+    }
+
+    @Override
     public List<Container> getContainers() {
       return this.containers;
     }
-    
+
     public void addContainer(Container container) {
       containers.add(container);
     }
   }
-  
+
   private class ZKRMState implements RMState {
     private List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
     private Map<ApplicationId, ApplicationInfo> applications = new 
-      HashMap<ApplicationId, ApplicationInfo>();
-    
+    HashMap<ApplicationId, ApplicationInfo>();
+
     public ZKRMState() {
       LOG.info("Restoring RM state from ZK");
     }
-    
+
     private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
       /** get the list of nodes stored in zk **/
       //TODO PB
@@ -322,7 +392,7 @@ public class ZKStore implements Store {
       }
       return nodes;
     }
-    
+
     @Override
     public List<NodeManager> getStoredNodeManagers()  {
       return nodeManagers;
@@ -332,7 +402,7 @@ public class ZKStore implements Store {
     public NodeId getLastLoggedNodeId() {
       return nodeId;
     }
-    
+
     private void readLastNodeId() throws IOException {
       Stat stat = new Stat();
       try {
@@ -346,7 +416,7 @@ public class ZKStore implements Store {
         throw convertToIOException(ke);
       }
     }
-    
+
     private ApplicationInfo getAppInfo(String app) throws IOException {
       ApplicationInfoImpl info = null;
       Stat stat = new Stat();
@@ -363,6 +433,9 @@ public class ZKStore implements Store {
           if (APP_MASTER.equals(child)) {
             master = new ApplicationMasterPBImpl(ApplicationMasterProto.parseFrom(childdata));
             info.setApplicationMaster(master);
+          } else if (APP_MASTER_CONTAINER.equals(child)) {
+            Container masterContainer = new ContainerPBImpl(ContainerProto.parseFrom(data));
+            info.setMasterContainer(masterContainer);
           } else {
             Container container = new ContainerPBImpl(ContainerProto.parseFrom(data));
             info.addContainer(container);
@@ -376,7 +449,7 @@ public class ZKStore implements Store {
       }
       return info;
     }
-    
+
     private void load() throws IOException {
       List<NodeManagerInfo> nodeInfos = listStoredNodes();
       for (NodeManagerInfo node: nodeInfos) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Mon May  2 19:01:06 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
 import org.apache.hadoop.yarn.server.YarnServerConfig;
@@ -61,6 +63,9 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
@@ -115,12 +120,16 @@ ResourceTracker, ClusterTracker {
   private final AtomicInteger nodeCounter = new AtomicInteger(0);
   private static final HeartbeatResponse reboot = recordFactory.newRecordInstance(HeartbeatResponse.class);
   private long nmExpiryInterval;
-
-  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
+  private final RMContext rmContext;
+  private final NodeStore nodeStore;
+  
+  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager, RMContext context) {
     super(RMResourceTrackerImpl.class.getName());
     reboot.setReboot(true);
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.heartbeatThread = new HeartBeatThread();
+    this.rmContext = context;
+    this.nodeStore = context.getNodeStore();
   }
 
   @Override
@@ -167,7 +176,7 @@ ResourceTracker, ClusterTracker {
   }
   
   protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
-      String hostString, String httpAddress, Node node, Resource capability) {
+      String hostString, String httpAddress, Node node, Resource capability) throws IOException {
     NodeInfoTracker nTracker = null;
     
     synchronized(nodeManagers) {
@@ -178,6 +187,7 @@ ResourceTracker, ClusterTracker {
               node,
               capability);
         nodes.put(nodeManager.getNodeAddress(), nodeId);
+        nodeStore.storeNode(nodeManager);
         /* Inform the listeners */
         resourceListener.addNode(nodeManager);
         HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
@@ -202,12 +212,17 @@ ResourceTracker, ClusterTracker {
     Resource capability = request.getResource();
 
     NodeId nodeId = getNodeId(node);
-    NodeInfoTracker nTracker = getAndAddNodeInfoTracker(
+    
+    NodeInfoTracker nTracker = null;
+    try {
+    nTracker = getAndAddNodeInfoTracker(
       nodeId, node.toString(), httpAddress,
                 resolve(node.toString()),
                 capability);
           // Inform the scheduler
-      
+    } catch(IOException io) {
+      throw  RPCUtil.getRemoteException(io);
+    }
     addForTracking(nodeId);
     LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress
         + ") registered with capability: " + capability.getMemory()
@@ -459,21 +474,43 @@ ResourceTracker, ClusterTracker {
     } 
   }
 
-  @Override
-  public  boolean releaseContainer(Container container) {
+  private NodeManager getNodeManagerForContainer(Container container) {
     NodeManager node;
     synchronized (nodeManagers) {
       LOG.info("DEBUG -- Container manager address " + container.getContainerManagerAddress());
       NodeId nodeId = nodes.get(container.getContainerManagerAddress());
       node = nodeManagers.get(nodeId).getNodeManager();
     }
+    return node;
+  }
+  @Override
+  public  boolean releaseContainer(Container container) {
+    NodeManager node = getNodeManagerForContainer(container);
     node.releaseContainer(container);
     return false;
   }
   
   @Override
   public void recover(RMState state) {
-
+    List<NodeManager> nodeManagers = state.getStoredNodeManagers();
+    for (NodeManager nm: nodeManagers) {
+      try {
+        getAndAddNodeInfoTracker(nm.getNodeID(), nm.getNodeAddress(), nm.getHttpAddress(), 
+          nm.getNode(), nm.getTotalCapability());
+      } catch(IOException ie) {
+        //ignore
+      }
+    }
+    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
+      List<Container> containers = entry.getValue().getContainers();
+      List<Container> containersToAdd = new ArrayList<Container>();
+      for (Container c: containers) {
+        NodeManager containerNode = getNodeManagerForContainer(c);
+        containersToAdd.add(c);
+        containerNode.allocateContainer(entry.getKey(), containersToAdd);
+        containersToAdd.clear();
+      }
+    }
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May  2 19:01:06 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
@@ -71,6 +73,8 @@ public class Application {
   Map<Priority, Integer> schedulingOpportunities = 
     new HashMap<Priority, Integer>();
   
+  private final ApplicationStore store;
+  
   /* Current consumption */
   List<Container> acquired = new ArrayList<Container>();
   List<Container> completedContainers = new ArrayList<Container>();
@@ -92,11 +96,12 @@ public class Application {
     new HashMap<Priority, Set<NodeInfo>>();
 
   public Application(ApplicationId applicationId, ApplicationMaster master,
-      Queue queue, String user) {
+      Queue queue, String user, ApplicationStore store) {
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user; 
     this.master = master;
+    this.store = store;
   }
 
   public ApplicationId getApplicationId() {
@@ -358,13 +363,17 @@ public class Application {
         offSwitchRequest.getNumContainers() - containers.size());
   }
 
-  synchronized private void allocate(List<Container> containers) {
+  synchronized public void allocate(List<Container> containers) {
     // Update consumption and track allocations
     for (Container container : containers) {
       Resources.addTo(currentConsumption, container.getResource());
 
       allocated.add(container);
-
+      try {
+        store.storeContainer(container);
+      } catch(IOException ie) {
+        //TODO fix this. we shouldnt ignore
+      }
       LOG.debug("allocate: applicationId=" + applicationId + 
           " container=" + container.getId() + " host=" + container.getContainerManagerAddress());
     }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Mon May  2 19:01:06 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 
 /**
  * This interface is used by the components to talk to the
@@ -53,11 +54,12 @@ public interface YarnScheduler {
    * @param applicationId application which has been submitted
    * @param master the application master
    * @param user application user
-   * @param queue queue to which the applications is being submitted
+   * @param queue queue to which the applications is being submitte
    * @param priority application priority
+   * @param appStore the storage for the application.
    */
   public void addApplication(ApplicationId applicationId, ApplicationMaster master,
-      String user, String queue, Priority priority) 
+      String user, String queue, Priority priority, ApplicationStore appStore) 
   throws IOException;
   
   /**

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098735&r1=1098734&r2=1098735&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May  2 19:01:06 2011
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
@@ -229,7 +230,7 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public void addApplication(ApplicationId applicationId, ApplicationMaster master,
-      String user, String queueName, Priority priority)
+      String user, String queueName, Priority priority, ApplicationStore appStore)
   throws IOException {
     Queue queue = queues.get(queueName);
 
@@ -243,7 +244,7 @@ implements ResourceScheduler, CapacitySc
           " submitted by user " + user + " to non-leaf queue: " + queueName);
     }
 
-    Application application = new Application(applicationId, master, queue, user); 
+    Application application = new Application(applicationId, master, queue, user, appStore); 
     try {
       queue.submitApplication(application, user, queueName, priority);
     } catch (AccessControlException ace) {
@@ -467,7 +468,8 @@ implements ResourceScheduler, CapacitySc
       try {
         addApplication(event.getAppContext().getApplicationID(), event.getAppContext().getMaster(),
             event.getAppContext().getUser(), event.getAppContext().getQueue(),
-            event.getAppContext().getSubmissionContext().getPriority());
+            event.getAppContext().getSubmissionContext().getPriority(),
+            event.getAppContext().getStore());
       } catch(IOException ie) {
         LOG.error("Error in adding an application to the scheduler", ie);
         //TODO do proper error handling to shutdown the Resource Manager is we 
@@ -535,9 +537,8 @@ implements ResourceScheduler, CapacitySc
     for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
       ApplicationId appId = entry.getKey();
       ApplicationInfo appInfo = entry.getValue();
-      
-      addApplication(appId, appInfo.getApplicationMaster(), appInfo.getApplicationSubmissionContext().getUser(),
-          appInfo.getApplicationSubmissionContext().getQueue(), appInfo.getApplicationSubmissionContext().getPriority());
+      Application app = applications.get(appId);
+      app.allocate(appInfo.getContainers());
       for (Container c: entry.getValue().getContainers()) {
         Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
         queue.recoverContainer(clusterResource, applications.get(appId), c);



Mime
View raw message