hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1098414 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resou...
Date Sun, 01 May 2011 21:33:24 GMT
Author: mahadev
Date: Sun May  1 21:33:23 2011
New Revision: 1098414

URL: http://svn.apache.org/viewvc?rev=1098414&view=rev
Log:
RM Restart Phase 2 - Completed the recovery of components in the RM (mahadev)

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java
Removed:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Sun May  1 21:33:23 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    RM Restart Phase 2 - Completed the recovery of components in the RM (mahadev)
+
     MAPREDUCE-2434. Metrics for ResourceManager. (Luke Lu via acmurthy)
 
     Fix container launch w/ inconsistent credential file naming. (cdouglas)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Sun May  1 21:33:23 2011
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -217,7 +218,8 @@ public class TestRMContainerAllocator {
   }
 
   private FifoScheduler createScheduler() throws YarnRemoteException {
-    FifoScheduler fsc = new FifoScheduler() {
+    ClusterTracker clusterTracker = null;
+    FifoScheduler fsc = new FifoScheduler(clusterTracker) {
       //override this to copy the objects
       //otherwise FifoScheduler updates the numContainers in same objects as kept by
       //RMContainerAllocator
@@ -240,7 +242,7 @@ public class TestRMContainerAllocator {
       }
     };
     try {
-      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager());
+      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
       fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
           recordFactory.newRecordInstance(ApplicationMaster.class),
           "test", null, null);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Sun May  1 21:33:23 2011
@@ -76,7 +76,7 @@ public class AdminService extends Abstra
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
       throws YarnRemoteException {
     try {
-      scheduler.reinitialize(conf, null); // ContainerTokenSecretManager can't
+      scheduler.reinitialize(conf, null, null); // ContainerTokenSecretManager can't
                                           // be 'refreshed'
       RefreshQueuesResponse response = 
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Sun May  1 21:33:23 2011
@@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -74,7 +74,7 @@ import org.apache.hadoop.yarn.service.Ab
 public class ClientRMService extends AbstractService implements ClientRMProtocol {
   private static final Log LOG = LogFactory.getLog(ClientRMService.class);
   
-  final private ResourceContext clusterInfo;
+  final private ClusterTracker clusterInfo;
   final private ApplicationsManager applicationsManager;
   final private ResourceScheduler scheduler;
   
@@ -84,7 +84,7 @@ public class ClientRMService extends Abs
   InetSocketAddress clientBindAddress;
   
   public ClientRMService(ApplicationsManager applicationsManager, 
-        ResourceContext clusterInfo, ResourceScheduler scheduler) {
+        ClusterTracker clusterInfo, ResourceScheduler scheduler) {
     super(ClientRMService.class.getName());
     this.clusterInfo = clusterInfo;
     this.applicationsManager = applicationsManager;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sun May  1 21:33:23 2011
@@ -32,13 +32,15 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.
+  ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
@@ -51,7 +53,7 @@ import org.apache.hadoop.yarn.webapp.Web
  * The ResourceManager is the main class that is a set of components.
  *
  */
-public class ResourceManager extends CompositeService {
+public class ResourceManager extends CompositeService implements Recoverable {
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
   public static final long clusterTimeStamp = System.currentTimeMillis();
   private YarnConfiguration conf;
@@ -73,10 +75,11 @@ public class ResourceManager extends Com
   private AtomicBoolean shutdown = new AtomicBoolean(false);
   private WebApp webApp;
   private RMContext asmContext;
-  private Store store;
+  private final Store store;
   
-  public ResourceManager() {
+  public ResourceManager(Store store) {
     super("ResourceManager");
+    this.store = store;
   }
   
   public RMContext getRMContext() {
@@ -114,10 +117,12 @@ public class ResourceManager extends Com
     }
   }
   
+  public void recover() {
+    
+  }
   @Override
   public synchronized void init(Configuration conf) {
     
-    this.store = StoreFactory.getStore(conf);
     this.asmContext = new RMContextImpl(this.store);
     addService(asmContext.getDispatcher());
     // Initialize the config
@@ -128,11 +133,7 @@ public class ResourceManager extends Com
           conf.getClass(YarnConfiguration.RESOURCE_SCHEDULER, 
               FifoScheduler.class, ResourceScheduler.class), 
           this.conf);
-    try {
-      this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed to initialize scheduler", ioe);
-    }
+  
     this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler);
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
@@ -141,8 +142,14 @@ public class ResourceManager extends Com
     applicationsManager = createApplicationsManagerImpl();
     addService(applicationsManager);
     
-    rmResourceTracker = createRMResourceTracker(this.scheduler);
+    rmResourceTracker = createRMResourceTracker();
     addService(rmResourceTracker);
+  
+    try {
+      this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager, rmResourceTracker);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to initialize scheduler", ioe);
+    }
     
     clientRM = createClientRMService();
     addService(clientRM);
@@ -200,8 +207,8 @@ public class ResourceManager extends Com
     super.stop();
   }
   
-  protected RMResourceTrackerImpl createRMResourceTracker(ResourceListener listener) {
-    return new RMResourceTrackerImpl(this.containerTokenSecretManager, listener);
+  protected RMResourceTrackerImpl createRMResourceTracker() {
+    return new RMResourceTrackerImpl(this.containerTokenSecretManager);
   }
   
   protected ApplicationsManagerImpl createApplicationsManagerImpl() {
@@ -248,13 +255,22 @@ public class ResourceManager extends Com
     return this.rmResourceTracker;
   }
   
+
+  @Override
+  public void recover(RMState state) throws Exception {
+    applicationsManager.recover(state);
+    rmResourceTracker.recover(state);
+    scheduler.recover(state);
+  }
   
   public static void main(String argv[]) {
     ResourceManager resourceManager = null;
     try {
       Configuration conf = new YarnConfiguration();
-      resourceManager = new ResourceManager();
+      Store store =  StoreFactory.getStore(conf);
+      resourceManager = new ResourceManager(store);
       resourceManager.init(conf);
+      //resourceManager.recover();
       resourceManager.start();
     } catch (Throwable e) {
       LOG.error("Error starting RM", e);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Sun May  1 21:33:23 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
@@ -45,6 +45,9 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
@@ -55,7 +58,7 @@ import org.apache.hadoop.yarn.service.Ab
 @Evolving
 @Private
 public class AMTracker extends AbstractService  implements EventHandler<ASMEvent
-    <ApplicationEventType>> {
+<ApplicationEventType>>, Recoverable {
   private static final Log LOG = LogFactory.getLog(AMTracker.class);
   private HeartBeatThread heartBeatThread;
   private long amExpiryInterval; 
@@ -63,9 +66,9 @@ public class AMTracker extends AbstractS
   private EventHandler handler;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private int amMaxRetries;
-  
+
   private final RMContext asmContext;
-  
+
   private final Map<ApplicationId, ApplicationMasterInfo> applications = 
     new ConcurrentHashMap<ApplicationId, ApplicationMasterInfo>();
 
@@ -84,7 +87,7 @@ public class AMTracker extends AbstractS
           }
         }
     );
-  
+
   public AMTracker(RMContext asmContext) {
     super(AMTracker.class.getName());
     this.heartBeatThread = new HeartBeatThread();
@@ -96,7 +99,7 @@ public class AMTracker extends AbstractS
     super.init(conf);
     this.handler = asmContext.getDispatcher().getEventHandler();
     this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, 
-    YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
+        YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
     LOG.info("AM expiry interval: " + this.amExpiryInterval);
     this.amMaxRetries =  conf.getInt(YarnConfiguration.AM_MAX_RETRIES, 
         YarnConfiguration.DEFAULT_AM_MAX_RETRIES);
@@ -170,9 +173,9 @@ public class AMTracker extends AbstractS
         am = applications.get(app);
       }
       handler.handle(new ASMEvent<ApplicationEventType>
-          (ApplicationEventType.EXPIRE, am));
-      }
+      (ApplicationEventType.EXPIRE, am));
     }
+  }
 
   @Override
   public void stop() {
@@ -189,14 +192,14 @@ public class AMTracker extends AbstractS
   public void addMaster(String user,  ApplicationSubmissionContext 
       submissionContext, String clientToken) {
     ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(asmContext, 
-      user, submissionContext, clientToken);
+        user, submissionContext, clientToken);
     synchronized(applications) {
       applications.put(applicationMaster.getApplicationID(), applicationMaster);
     }
     asmContext.getDispatcher().getSyncHandler().handle(new ASMEvent<ApplicationEventType>(
         ApplicationEventType.ALLOCATE, applicationMaster));
   }
-  
+
   public void finish(ApplicationId application) {
     ApplicationMasterInfo masterInfo = null;
     synchronized(applications) {
@@ -245,14 +248,14 @@ public class AMTracker extends AbstractS
 
   public void kill(ApplicationId applicationID) {
     ApplicationMasterInfo masterInfo = null;
-    
+
     synchronized(applications) {
       masterInfo = applications.get(applicationID);
     }
     handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.KILL, 
-    masterInfo));
+        masterInfo));
   }
-  
+
   /*
    * this class is used for passing status context to the application state
    * machine.
@@ -261,14 +264,14 @@ public class AMTracker extends AbstractS
     private final ApplicationId appID;
     private final ApplicationMaster master;
     private final UnsupportedOperationException notimplemented;
-  
+
     public TrackerAppContext(
-         ApplicationId appId, ApplicationMaster master) {
+        ApplicationId appId, ApplicationMaster master) {
       this.appID = appId;
       this.master = master;
       this.notimplemented = new NotImplementedException();
     }
-    
+
     @Override
     public ApplicationSubmissionContext getSubmissionContext() {
       throw notimplemented;
@@ -303,7 +306,7 @@ public class AMTracker extends AbstractS
     }
     @Override
     public String getName() {
-     throw notimplemented;
+      throw notimplemented;
     }
     @Override
     public String getQueue() {
@@ -312,10 +315,10 @@ public class AMTracker extends AbstractS
 
     @Override
     public int getFailedCount() {
-     throw notimplemented;
+      throw notimplemented;
     }
   }
-  
+
   public void heartBeat(ApplicationStatus status) {
     ApplicationMaster master = recordFactory.newRecordInstance(ApplicationMaster.class);
     master.setStatus(status);
@@ -324,7 +327,7 @@ public class AMTracker extends AbstractS
     handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.STATUSUPDATE, 
         context));
   }
-  
+
   public void registerMaster(ApplicationMaster applicationMaster) {
     applicationMaster.getStatus().setLastSeen(System.currentTimeMillis());
     ApplicationMasterInfo master = null;
@@ -335,9 +338,9 @@ public class AMTracker extends AbstractS
     TrackerAppContext registrationContext = new TrackerAppContext(
         master.getApplicationID(), applicationMaster);
     handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-      REGISTERED,  registrationContext));
+        REGISTERED,  registrationContext));
   }
-  
+
   @Override
   public void handle(ASMEvent<ApplicationEventType> event) {
     ApplicationId appID = event.getAppContext().getApplicationID();
@@ -354,26 +357,76 @@ public class AMTracker extends AbstractS
     /* we need to launch the applicaiton master on allocated transition */
     if (masterInfo.getState() == ApplicationState.ALLOCATED) {
       handler.handle(new ASMEvent<ApplicationEventType>(
-        ApplicationEventType.LAUNCH, masterInfo));
+          ApplicationEventType.LAUNCH, masterInfo));
     }
     if (masterInfo.getState() == ApplicationState.LAUNCHED) {
       /* the application move to a launched state start tracking */
       synchronized (amExpiryQueue) {
         LOG.info("DEBUG -- adding to  expiry " + masterInfo.getStatus() + 
-        " currenttime " + System.currentTimeMillis());
+            " currenttime " + System.currentTimeMillis());
         amExpiryQueue.add(masterInfo.getStatus());
       }
     }
-    
+
     /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */
     if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) {
       /* check to see if the number of retries are reached or not */
       if (masterInfo.getFailedCount() < this.amMaxRetries) {
         handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.ALLOCATE,
-          masterInfo));
+            masterInfo));
       } else {
         handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
-              FAILED_MAX_RETRIES, masterInfo));
+            FAILED_MAX_RETRIES, masterInfo));
+      }
+    }
+  }
+
+  @Override
+  public void recover(RMState state) {
+    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
+      ApplicationId appId = entry.getKey();
+      ApplicationInfo appInfo = entry.getValue();
+      ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.asmContext,
+          appInfo.getApplicationSubmissionContext().getUser(), appInfo.getApplicationSubmissionContext(), 
+          appInfo.getApplicationMaster().getClientToken());
+      ApplicationMaster master = masterInfo.getMaster();
+      ApplicationMaster storedAppMaster = appInfo.getApplicationMaster();
+      master.setAMFailCount(storedAppMaster.getAMFailCount());
+      master.setApplicationId(storedAppMaster.getApplicationId());
+      master.setClientToken(storedAppMaster.getClientToken());
+      master.setContainerCount(storedAppMaster.getContainerCount());
+      master.setHttpPort(storedAppMaster.getHttpPort());
+      master.setHost(storedAppMaster.getHost());
+      master.setRpcPort(storedAppMaster.getRpcPort());
+      master.setStatus(storedAppMaster.getStatus());
+      master.setState(storedAppMaster.getState());
+      applications.put(appId, masterInfo);
+      
+      switch(master.getState()) {
+      case ALLOCATED:
+        break;
+      case ALLOCATING:
+        break;
+      case CLEANUP:
+        break;
+      case EXPIRED_PENDING:
+        break;
+      case COMPLETED:
+        break;
+      case FAILED:
+        break;
+      case LAUNCHED:
+        break;
+      case KILLED:
+        break;
+      case LAUNCHING:
+        break;
+      case PAUSED:
+        break;
+      case PENDING:
+        break;
+      case RUNNING:
+        break;
       }
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java Sun May  1 21:33:23 2011
@@ -27,13 +27,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 
 /**
  * This interface defines the interface for ApplicationsManager.
  */
 @Private
 @Evolving
-public interface ApplicationsManager {
+public interface ApplicationsManager extends Recoverable {
    ApplicationId getNewApplicationID();
    ApplicationMaster getApplicationMaster(ApplicationId applicationId);
    Application getApplication(ApplicationId applicationID);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Sun May  1 21:33:23 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
@@ -251,4 +252,10 @@ public class ApplicationsManagerImpl ext
       createApplication(master.getMaster(), 
           master.getUser(), master.getQueue(), master.getName());
   }
+
+
+  @Override
+  public void recover(RMState state) {
+    amTracker.recover(state);
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Sun May  1 21:33:23 2011
@@ -87,18 +87,17 @@ public class MemStore implements Store {
     }
 
     @Override
-    public List<NodeManager> getStoredNodeManagers() throws IOException {
+    public List<NodeManager> getStoredNodeManagers()  {
       return new ArrayList<NodeManager>();
     }
 
     @Override
-    public NodeId getLastLoggedNodeId() throws IOException {
+    public NodeId getLastLoggedNodeId() {
       return nodeId;
     }
 
     @Override
-    public Map<ApplicationId, ApplicationInfo> getStoredApplications()
-    throws IOException {
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications() {
       return new HashMap<ApplicationId, Store.ApplicationInfo>();
     }
   }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java?rev=1098414&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java Sun May  1 21:33:23 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+
+public interface Recoverable {
+  public void recover(RMState state) throws Exception;
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Sun May  1 21:33:23 2011
@@ -36,9 +36,9 @@ public interface Store extends NodeStore
     public List<Container> getContainers();
   }
   public interface RMState {
-    public List<NodeManager> getStoredNodeManagers() throws IOException;
-    public Map<ApplicationId, ApplicationInfo> getStoredApplications() throws IOException;
-    public NodeId getLastLoggedNodeId() throws IOException;
+    public List<NodeManager> getStoredNodeManagers() ;
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications();
+    public NodeId getLastLoggedNodeId();
   }
   public RMState restore() throws IOException;
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Sun May  1 21:33:23 2011
@@ -324,7 +324,7 @@ public class ZKStore implements Store {
     }
     
     @Override
-    public List<NodeManager> getStoredNodeManagers()  throws IOException {
+    public List<NodeManager> getStoredNodeManagers()  {
       return nodeManagers;
     }
 
@@ -403,8 +403,7 @@ public class ZKStore implements Store {
     }
 
     @Override
-    public Map<ApplicationId, ApplicationInfo> getStoredApplications()
-        throws IOException {
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications() {
       return applications;
     }
   }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java?rev=1098414&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java Sun May  1 21:33:23 2011
@@ -0,0 +1,65 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+
+
+/**
+ * The read-only interface for cluster resource
+ */
+public interface ClusterTracker extends Recoverable{
+  
+  /**
+   * Get all node info
+   * @return a list of node info
+   */
+  List<NodeInfo> getAllNodeInfo();
+  
+  /**
+   * Get cluster metrics from the resource tracker.
+   * @return the cluster metrics for the cluster.
+   */
+  YarnClusterMetrics getClusterMetrics();
+  
+  /**
+   * the application that is finished.
+   * @param applicationId the applicaiton that finished
+   * @param nodesToNotify  the nodes that need to be notified.
+   */
+  void finishedApplication(ApplicationId applicationId, List<NodeInfo> nodesToNotify);
+  
+  /**
+   * Release a container
+   * @param container the container to be released
+   */
+  boolean releaseContainer(Container container);
+  
+  /**
+   * Adding listener to be notified of node updates.
+   * @param listener
+   */
+  public void addListener(ResourceListener listener);
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java Sun May  1 21:33:23 2011
@@ -1,26 +1,33 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import java.util.List;
+
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
 
 /**
  * Node managers information on available resources 
@@ -78,4 +85,23 @@ public interface NodeInfo {
    * @return the number of containers
    */
   public int getNumContainers();
- }
\ No newline at end of file
+
+  /**
+   * Inform the node of allocated containers
+   * @param applicationId the application id 
+   * @param containers the list of containers
+   */
+  public void allocateContainer(ApplicationId applicationId,
+      List<Container> containers);
+  
+  /**
+   * 
+   * @return
+   */
+  public Application getReservedApplication();
+
+  public void reserveResource(Application application, Priority priority, 
+      Resource resource);
+
+  public void unreserveResource(Application application, Priority priority);
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java Sun May  1 21:33:23 2011
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.res
  * This should be package private. It does not need to be public.
  *
  */
-class NodeInfoTracker {
+public class NodeInfoTracker {
   private final NodeManager node;
   HeartbeatResponse lastHeartBeatResponse;
   private long lastSeen;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Sun May  1 21:33:23 2011
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
@@ -41,6 +41,8 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -72,7 +75,7 @@ import org.apache.hadoop.yarn.service.Ab
  *`
  */
 public class RMResourceTrackerImpl extends AbstractService implements 
-ResourceTracker, ResourceContext {
+ResourceTracker, ClusterTracker {
   private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class);
   /* we dont garbage collect on nodes. A node can come back up again and re register,
    * so no use garbage collecting. Though admin can break the RM by bouncing 
@@ -82,9 +85,9 @@ ResourceTracker, ResourceContext {
   private final Map<NodeId, NodeInfoTracker> nodeManagers = 
     new ConcurrentHashMap<NodeId, NodeInfoTracker>();
   private final HeartBeatThread heartbeatThread;
-  
+
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
+
   private final TreeSet<NodeId> nmExpiryQueue =
     new TreeSet<NodeId>(
         new Comparator<NodeId>() {
@@ -94,17 +97,17 @@ ResourceTracker, ResourceContext {
             long p1LastSeen = nit1.getNodeLastSeen();
             long p2LastSeen = nit2.getNodeLastSeen();
             if (p1LastSeen < p2LastSeen) {
-                return -1;
+              return -1;
             } else if (p1LastSeen > p2LastSeen) {
-                return 1;
-              } else {
-                return (nit1.getNodeManager().getNodeID().getId() -
-                    nit2.getNodeManager().getNodeID().getId());
-              }
+              return 1;
+            } else {
+              return (nit1.getNodeManager().getNodeID().getId() -
+                  nit2.getNodeManager().getNodeID().getId());
             }
           }
-      );
-  
+        }
+    );
+
   private ResourceListener resourceListener;
   private InetSocketAddress resourceTrackerAddress;
   private Server server;
@@ -113,13 +116,11 @@ ResourceTracker, ResourceContext {
   private static final HeartbeatResponse reboot = recordFactory.newRecordInstance(HeartbeatResponse.class);
   private long nmExpiryInterval;
 
-  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager,
-      ResourceListener listener) {
+  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) {
     super(RMResourceTrackerImpl.class.getName());
     reboot.setReboot(true);
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.heartbeatThread = new HeartBeatThread();
-    this.resourceListener = listener;
   }
 
   @Override
@@ -134,6 +135,11 @@ ResourceTracker, ResourceContext {
   }
 
   @Override
+  public void addListener(ResourceListener listener) {
+    this.resourceListener = listener;
+  }
+
+  @Override
   public void start() {
     // ResourceTrackerServer authenticates NodeManager via Kerberos if
     // security is enabled, so no secretManager.
@@ -159,26 +165,20 @@ ResourceTracker, ResourceContext {
   public static Node resolve(String hostName) {
     return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
   }
-
-  @Override
-  public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
-    String host = request.getHost();
-    int cmPort = request.getContainerManagerPort();
-    String node = host + ":" + cmPort;
-    String httpAddress = host + ":" + request.getHttpPort();
-    Resource capability = request.getResource();
   
-    NodeId nodeId = getNodeId(node);
+  protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
+      String hostString, String httpAddress, Node node, Resource capability) {
     NodeInfoTracker nTracker = null;
     
     synchronized(nodeManagers) {
       if (!nodeManagers.containsKey(nodeId)) {
-        /* we do the resolving here, so that scheduler does not have to do it */
+        LOG.info("DEBUG -- Adding  " + hostString);
         NodeManager nodeManager =
-            new NodeManagerImpl(nodeId, node.toString(), httpAddress,
-                resolve(node.toString()),
-                capability);
-        // Inform the scheduler
+          new NodeManagerImpl(nodeId, hostString, httpAddress,
+              node,
+              capability);
+        nodes.put(nodeManager.getNodeAddress(), nodeId);
+        /* Inform the listeners */
         resourceListener.addNode(nodeManager);
         HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
         response.setResponseId(0);
@@ -189,21 +189,67 @@ ResourceTracker, ResourceContext {
         nTracker.updateLastSeen(System.currentTimeMillis());
       }
     }
+    return nTracker;
+  }
+
+  @Override
+  public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest
+      request) throws YarnRemoteException {
+    String host = request.getHost();
+    int cmPort = request.getContainerManagerPort();
+    String node = host + ":" + cmPort;
+    String httpAddress = host + ":" + request.getHttpPort();
+    Resource capability = request.getResource();
+
+    NodeId nodeId = getNodeId(node);
+    NodeInfoTracker nTracker = getAndAddNodeInfoTracker(
+      nodeId, node.toString(), httpAddress,
+                resolve(node.toString()),
+                capability);
+          // Inform the scheduler
+      
     addForTracking(nodeId);
     LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress
         + ") registered with capability: " + capability.getMemory()
         + ", assigned nodeId " + nodeId.getId());
 
-    RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
+    RegistrationResponse regResponse = recordFactory.newRecordInstance(
+        RegistrationResponse.class);
     regResponse.setNodeId(nodeId);
     SecretKey secretKey =
       this.containerTokenSecretManager.createAndGetSecretKey(node);
     regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
-    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
+        RegisterNodeManagerResponse.class);
     response.setRegistrationResponse(regResponse);
     return response;
   }
 
+  /**
+   * Update the listeners. This method can be inlined but are there for 
+   * making testing easier
+   * @param nodeManager the {@link NodeInfo} to update.
+   * @param containers the containers from the status of the node manager.
+   */
+  protected void updateListener(NodeInfo nodeManager, Map<String, List<Container>>
+    containers) {
+  /* inform any listeners of node heartbeats */
+    resourceListener.nodeUpdate(
+        nodeManager, containers);
+  }
+  
+  
+  /**
+   * Get a response for the nodemanager heartbeat
+   * @param nodeManager the nodemanager to update
+   * @param containers the containers from the status update.
+   * @return the {@link NodeResponse} for the node manager.
+   */
+  protected NodeResponse NodeResponse(NodeManager nodeManager, Map<String, 
+      List<Container>> containers) {
+    return nodeManager.statusUpdate(containers);
+  }
+  
   @Override
   public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
     org.apache.hadoop.yarn.server.api.records.NodeStatus remoteNodeStatus = request.getNodeStatus();
@@ -234,12 +280,19 @@ ResourceTracker, ResourceContext {
       nodeHbResponse.setHeartbeatResponse(reboot);
       return nodeHbResponse;
     }
-
+    
+    /** TODO This should be 3 step process.
+     * nodemanager.statusupdate
+     * listener.update()
+     * nodemanager.getNodeResponse()
+     * This will allow flexibility in updates/scheduling/premption
+     */
+    NodeResponse nodeResponse = nodeManager.statusUpdate(remoteNodeStatus.getAllContainers());
     /* inform any listeners of node heartbeats */
-    NodeResponse nodeResponse = resourceListener.nodeUpdate(
+    updateListener(
         nodeManager, remoteNodeStatus.getAllContainers());
+  
 
-    
     HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
     response.addAllContainersToCleanup(nodeResponse.getContainersToCleanUp());
 
@@ -249,12 +302,12 @@ ResourceTracker, ResourceContext {
     nTracker.refreshHeartBeatResponse(response);
     nTracker.updateLastSeen(remoteNodeStatus.getLastSeen());
     boolean prevHealthStatus =
-        nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy();
+      nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy();
     NodeHealthStatus remoteNodeHealthStatus =
-        remoteNodeStatus.getNodeHealthStatus();
+      remoteNodeStatus.getNodeHealthStatus();
     nTracker.getNodeManager().updateHealthStatus(
         remoteNodeHealthStatus);
-//    boolean prevHealthStatus = nodeHbResponse.
+    //    boolean prevHealthStatus = nodeHbResponse.
     nodeHbResponse.setHeartbeatResponse(response);
 
     // Take care of node-health
@@ -263,18 +316,18 @@ ResourceTracker, ResourceContext {
       // Node's health-status changed.
       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
         // TODO: Node has become unhealthy, remove?
-//        LOG.info("Node " + nodeManager.getNodeID()
-//            + " has become unhealthy. Health-check report: "
-//            + remoteNodeStatus.nodeHealthStatus.healthReport
-//            + "Removing it from the scheduler.");
-//        resourceListener.removeNode(nodeManager);
+        //        LOG.info("Node " + nodeManager.getNodeID()
+        //            + " has become unhealthy. Health-check report: "
+        //            + remoteNodeStatus.nodeHealthStatus.healthReport
+        //            + "Removing it from the scheduler.");
+        //        resourceListener.removeNode(nodeManager);
       } else {
         // TODO: Node has become healthy back again, add?
-//        LOG.info("Node " + nodeManager.getNodeID()
-//            + " has become healthy back again. Health-check report: "
-//            + remoteNodeStatus.nodeHealthStatus.healthReport
-//            + " Adding it to the scheduler.");
-//        this.resourceListener.addNode(nodeManager);
+        //        LOG.info("Node " + nodeManager.getNodeID()
+        //            + " has become healthy back again. Health-check report: "
+        //            + remoteNodeStatus.nodeHealthStatus.healthReport
+        //            + " Adding it to the scheduler.");
+        //        this.resourceListener.addNode(nodeManager);
       }
     }
     return nodeHbResponse;
@@ -286,13 +339,12 @@ ResourceTracker, ResourceContext {
     return (ntracker == null ? null: ntracker.getNodeManager());
   }
 
-  private synchronized NodeId getNodeId(String node) {
+  private  NodeId getNodeId(String node) {
     NodeId nodeId;
     nodeId = nodes.get(node);
     if (nodeId == null) {
       nodeId = recordFactory.newRecordInstance(NodeId.class);
       nodeId.setId(nodeCounter.getAndIncrement());
-      nodes.put(node.toString(), nodeId);
     }
     return nodeId;
   }
@@ -328,7 +380,7 @@ ResourceTracker, ResourceContext {
       nmExpiryQueue.add(nodeID);
     }
   }
-  
+
   protected void expireNMs(List<NodeId> nodes) {
     for (NodeId id: nodes) {
       synchronized (nodeManagers) {
@@ -357,10 +409,10 @@ ResourceTracker, ResourceContext {
        * its alright. We do not want to hold a hold on nodeManagers while going
        * through the expiry queue.
        */
-      
+
       List<NodeId> expired = new ArrayList<NodeId>();
       LOG.info("Starting expiring thread with interval " + nmExpiryInterval);
-      
+
       while (!stop) {
         long now = System.currentTimeMillis();
         expired.clear();
@@ -394,4 +446,34 @@ ResourceTracker, ResourceContext {
       }
     }
   }
+
+  @Override
+  public void finishedApplication(ApplicationId applicationId,
+      List<NodeInfo> nodesToNotify) {
+    for (NodeInfo info: nodesToNotify) {
+      NodeManager node;
+      synchronized(nodeManagers) {
+        node = nodeManagers.get(info.getNodeID()).getNodeManager();
+      }
+      node.finishedApplication(applicationId);
+    } 
+  }
+
+  @Override
+  public  boolean releaseContainer(Container container) {
+    NodeManager node;
+    synchronized (nodeManagers) {
+      LOG.info("DEBUG -- Container manager address " + container.getContainerManagerAddress());
+      NodeId nodeId = nodes.get(container.getContainerManagerAddress());
+      node = nodeManagers.get(nodeId).getNodeManager();
+    }
+    node.releaseContainer(container);
+    return false;
+  }
+  
+  @Override
+  public void recover(RMState state) {
+
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Sun May  1 21:33:23 2011
@@ -78,15 +78,15 @@ public class Application {
   boolean pending = true; // for app metrics
   
   /* Reserved containers */
-  private final Comparator<NodeManager> nodeComparator = 
-    new Comparator<NodeManager>() {
+  private final Comparator<NodeInfo> nodeComparator = 
+    new Comparator<NodeInfo>() {
     @Override
-    public int compare(NodeManager o1, NodeManager o2) {
+    public int compare(NodeInfo o1, NodeInfo o2) {
       return o1.getNodeID().getId() - o2.getNodeID().getId();
     }
   };
-  final Map<Priority, Set<NodeManager>> reservedContainers =
-    new HashMap<Priority, Set<NodeManager>>();
+  final Map<Priority, Set<NodeInfo>> reservedContainers =
+    new HashMap<Priority, Set<NodeInfo>>();
 
   public Application(ApplicationId applicationId, ApplicationMaster master,
       Queue queue, String user) {
@@ -389,15 +389,15 @@ public class Application {
   }
 
   public synchronized int getReservedContainers(Priority priority) {
-    Set<NodeManager> reservedNodes = this.reservedContainers.get(priority);
+    Set<NodeInfo> reservedNodes = this.reservedContainers.get(priority);
     return (reservedNodes == null) ? 0 : reservedNodes.size();
   }
 
-  public synchronized void reserveResource(NodeManager node, Priority priority,
+  public synchronized void reserveResource(NodeInfo node, Priority priority,
       Resource resource) {
-    Set<NodeManager> reservedNodes = this.reservedContainers.get(priority);
+    Set<NodeInfo> reservedNodes = this.reservedContainers.get(priority);
     if (reservedNodes == null) {
-      reservedNodes = new TreeSet<NodeManager>(nodeComparator);
+      reservedNodes = new TreeSet<NodeInfo>(nodeComparator);
       reservedContainers.put(priority, reservedNodes);
     }
     reservedNodes.add(node);
@@ -406,8 +406,8 @@ public class Application {
         " at priority " + priority);
   }
 
-  public synchronized void unreserveResource(NodeManager node, Priority priority) {
-    Set<NodeManager> reservedNodes = reservedContainers.get(priority);
+  public synchronized void unreserveResource(NodeInfo node, Priority priority) {
+    Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
     reservedNodes.remove(node);
     if (reservedNodes.isEmpty()) {
       this.reservedContainers.remove(priority);
@@ -418,8 +418,8 @@ public class Application {
         " at priority " + priority);
   }
 
-  public synchronized boolean isReserved(NodeManager node, Priority priority) {
-    Set<NodeManager> reservedNodes = reservedContainers.get(priority);
+  public synchronized boolean isReserved(NodeInfo node, Priority priority) {
+    Set<NodeInfo> reservedNodes = reservedContainers.get(priority);
     if (reservedNodes != null) { 
       return reservedNodes.contains(node);
     }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Sun May  1 21:33:23 2011
@@ -5,8 +5,6 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
@@ -14,21 +12,11 @@ public interface NodeManager extends Nod
 
   public static final String ANY = "*";
 
-  void allocateContainer(ApplicationId applicationId,
-      List<Container> containers);
-
   boolean releaseContainer(Container container);
 
   void updateHealthStatus(NodeHealthStatus healthStatus);
 
   NodeResponse statusUpdate(Map<String, List<Container>> containers);
 
-  void notifyFinishedApplication(ApplicationId applicationId);
-
-  Application getReservedApplication();
-  
-  void reserveResource(Application application, Priority priority, 
-      Resource resource);
-  
-  void unreserveResource(Application application, Priority priority);
+  void finishedApplication(ApplicationId applicationId);
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Sun May  1 21:33:23 2011
@@ -309,7 +309,7 @@ public class NodeManagerImpl implements 
     Resources.addTo(usedResource, resource);
   }
 
-  public synchronized void notifyFinishedApplication(ApplicationId applicationId) {  
+  public synchronized void finishedApplication(ApplicationId applicationId) {  
     finishedApplications.add(applicationId);
     /* make sure to iterate through the list and remove all the containers that 
      * belong to this application.

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Sun May  1 21:33:23 2011
@@ -23,10 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 
 /**
@@ -41,7 +38,7 @@ public interface ResourceListener {
    * add a node to the resource listener.
    * @param nodeManager the nodeManager view
    */
-  public void addNode(NodeManager nodeManager);
+  public void addNode(NodeInfo nodeInfo);
   
   /**
    * A node has been removed from the cluster.
@@ -53,9 +50,7 @@ public interface ResourceListener {
    * A status update from a NodeManager
    * @param nodeInfo NodeManager info
    * @param containers the containers completed/running/failed on this node.
-   * @return response information for the node, which containers to kill and 
-   * applications to clean.
    */
-  public NodeResponse nodeUpdate(NodeInfo nodeInfo, 
+  public void nodeUpdate(NodeInfo nodeInfo, 
       Map<String,List<Container>> containers);
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sun May  1 21:33:23 2011
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 /**
@@ -36,7 +38,7 @@ import org.apache.hadoop.yarn.server.sec
 @LimitedPrivate("yarn")
 @Evolving
 public interface ResourceScheduler extends ResourceListener, YarnScheduler, 
-  EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+  EventHandler<ASMEvent<ApplicationTrackerEventType>>, Recoverable {
   /**
    * Re-initialize the <code>ResourceScheduler</code>.
    * @param conf configuration
@@ -44,5 +46,5 @@ public interface ResourceScheduler exten
    * @throws IOException
    */
   void reinitialize(Configuration conf, 
-      ContainerTokenSecretManager secretManager) throws IOException;    
+      ContainerTokenSecretManager secretManager, ClusterTracker clusterTracker) throws IOException;    
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098414&r1=1098413&r2=1098414&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun May  1 21:33:23 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -39,6 +38,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
@@ -82,20 +83,21 @@ implements ResourceScheduler, CapacitySc
 
   private final Comparator<Application> applicationComparator = 
     new Comparator<Application>() {
-      @Override
-      public int compare(Application a1, Application a2) {
-        return a1.getApplicationId().getId() - a2.getApplicationId().getId();
-      }
+    @Override
+    public int compare(Application a1, Application a2) {
+      return a1.getApplicationId().getId() - a2.getApplicationId().getId();
+    }
   };
 
   private CapacitySchedulerConfiguration conf;
   private ContainerTokenSecretManager containerTokenSecretManager;
+  private ClusterTracker clusterTracker;
 
   private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
 
 
   private Resource minimumAllocation;
-  
+
   private Map<ApplicationId, Application> applications = 
     new TreeMap<ApplicationId, Application>(
         new org.apache.hadoop.yarn.util.BuilderUtils.ApplicationIdComparator());
@@ -123,13 +125,14 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public synchronized void reinitialize(Configuration conf,
-          ContainerTokenSecretManager containerTokenSecretManager) 
+      ContainerTokenSecretManager containerTokenSecretManager, ClusterTracker clusterTracker) 
   throws IOException {
     if (!initialized) {
       this.conf = new CapacitySchedulerConfiguration(conf);
       this.minimumAllocation = this.conf.getMinimumAllocation();
       this.containerTokenSecretManager = containerTokenSecretManager;
-
+      this.clusterTracker = clusterTracker;
+      if (clusterTracker != null) clusterTracker.addListener(this);
       initializeQueues(this.conf);
       initialized = true;
     } else {
@@ -166,11 +169,11 @@ implements ResourceScheduler, CapacitySc
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
-    
+
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
   }
-  
+
   /**
    * Ensure all existing queues are present. Queues cannot be deleted
    * @param queues existing queues
@@ -212,11 +215,11 @@ implements ResourceScheduler, CapacitySc
     }
 
     queues.put(queueName, queue);
-    
+
     LOG.info("Initialized queue: " + queue);
     return queue;
   }
-  
+
   @Override
   public void addApplication(ApplicationId applicationId, ApplicationMaster master,
       String user, String queueName, Priority priority)
@@ -317,9 +320,9 @@ implements ResourceScheduler, CapacitySc
       boolean includeApplications, boolean includeChildQueues, boolean recursive) 
   throws IOException {
     Queue queue = null;
-    
+
     synchronized (this) {
-     queue = this.queues.get(queueName); 
+      queue = this.queues.get(queueName); 
     }
 
     if (queue == null) {
@@ -351,30 +354,44 @@ implements ResourceScheduler, CapacitySc
     int memory = ask.getCapability().getMemory();
     int minMemory = minimumAllocation.getMemory();
     ask.getCapability().setMemory (
-      minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
+        minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
   }
 
+  private List<Container> getCompletedContainers(Map<String, List<Container>> allContainers) {
+    if (allContainers == null) {
+      return new ArrayList<Container>();
+    }
+    List<Container> completedContainers = new ArrayList<Container>();
+    // Iterate through the running containers and update their status
+    for (Map.Entry<String, List<Container>> e : 
+      allContainers.entrySet()) {
+      for (Container c: e.getValue()) {
+        if (c.getState() == ContainerState.COMPLETE) {
+          completedContainers.add(c);
+        }
+      }
+    }
+    return completedContainers;
+  }
 
   @Override
-  public synchronized NodeResponse nodeUpdate(NodeInfo node, 
+  public synchronized void nodeUpdate(NodeInfo nm, 
       Map<String,List<Container>> containers ) {
-    LOG.info("nodeUpdate: " + node);
+    LOG.info("nodeUpdate: " + nm);
 
-    NodeResponse nodeResponse = nodeUpdateInternal(node, containers);
 
     // Completed containers
-    processCompletedContainers(nodeResponse.getCompletedContainers());
-    NodeManager nm = nodes.get(node.getNodeAddress());
-    
+    processCompletedContainers(getCompletedContainers(containers));
+
     // Assign new containers
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
-    
+
     Application reservedApplication = nm.getReservedApplication();
     if (reservedApplication != null) {
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
-          reservedApplication.getApplicationId() + " on node: " + node);
+          reservedApplication.getApplicationId() + " on node: " + nm);
       LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
       queue.assignContainers(clusterResource, nm);
     }
@@ -383,12 +400,11 @@ implements ResourceScheduler, CapacitySc
     if (nm.getReservedApplication() == null) {
       root.assignContainers(clusterResource, nm);
     } else {
-      LOG.info("Skipping scheduling since node " + node + 
+      LOG.info("Skipping scheduling since node " + nm + 
           " is reserved by application " + 
           nm.getReservedApplication().getApplicationId());
     }
 
-    return nodeResponse;
   }
 
   private synchronized void processCompletedContainers(
@@ -455,11 +471,9 @@ implements ResourceScheduler, CapacitySc
       break;
     }
   }
-  
-  private Map<String, NodeManager> nodes = new HashMap<String, NodeManager>();
   private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
-  
- 
+
+
   public synchronized Resource getClusterResource() {
     return clusterResource;
   }
@@ -467,18 +481,10 @@ implements ResourceScheduler, CapacitySc
   @Override
   public synchronized void removeNode(NodeInfo nodeInfo) {
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
-    //TODO inform the applications that the containers are completed/failed
-    nodes.remove(nodeInfo.getNodeAddress());
   }
-  
-  public synchronized boolean isTracked(NodeInfo nodeInfo) {
-    NodeManager node = nodes.get(nodeInfo.getNodeAddress());
-    return (node == null? false: true);
-  }
- 
+
   @Override
-  public synchronized void addNode(NodeManager nodeManager) {
-    nodes.put(nodeManager.getNodeAddress(), nodeManager);
+  public synchronized void addNode(NodeInfo nodeManager) {
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 
@@ -486,30 +492,33 @@ implements ResourceScheduler, CapacitySc
       Container container) {
     // Reap containers
     LOG.info("Application " + applicationId + " released container " + container);
-    NodeManager nodeManager = nodes.get(container.getContainerManagerAddress());
-    return nodeManager.releaseContainer(container);
-  }
-  
-  public synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo, 
-      Map<String,List<Container>> containers) {
-    NodeManager node = nodes.get(nodeInfo.getNodeAddress());
-    LOG.debug("nodeUpdate: node=" + nodeInfo.getNodeAddress() + 
-        " available=" + nodeInfo.getAvailableResource().getMemory());
-    return node.statusUpdate(containers);
-    
+    return clusterTracker.releaseContainer(container);
   }
 
+
   public synchronized void addAllocatedContainers(NodeInfo nodeInfo, 
       ApplicationId applicationId, List<Container> containers) {
-    NodeManager node = nodes.get(nodeInfo.getNodeAddress());
-    node.allocateContainer(applicationId, containers);
+    nodeInfo.allocateContainer(applicationId, containers);
   }
 
   public synchronized void finishedApplication(ApplicationId applicationId,
       List<NodeInfo> nodesToNotify) {
-    for (NodeInfo node: nodesToNotify) {
-      NodeManager nodeManager = nodes.get(node.getNodeAddress());
-      nodeManager.notifyFinishedApplication(applicationId);
+    clusterTracker.finishedApplication(applicationId, nodesToNotify);
+  }
+
+  @Override
+  public void recover(RMState state) throws Exception {
+    applications.clear();
+    for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
+      ApplicationId appId = entry.getKey();
+      ApplicationInfo appInfo = entry.getValue();
+      
+      addApplication(appId, appInfo.getApplicationMaster(), appInfo.getApplicationSubmissionContext().getUser(),
+          appInfo.getApplicationSubmissionContext().getQueue(), appInfo.getApplicationSubmissionContext().getPriority());
+      for (Container c: entry.getValue().getContainers()) {
+        Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
+        queue.recoverContainer(clusterResource, applications.get(appId), c);
+      }
     }
   }
 }



Mime
View raw message