Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B96CF2D2F for ; Sun, 1 May 2011 21:33:58 +0000 (UTC) Received: (qmail 18784 invoked by uid 500); 1 May 2011 21:33:58 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 18684 invoked by uid 500); 1 May 2011 21:33:58 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 18675 invoked by uid 99); 1 May 2011 21:33:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 01 May 2011 21:33:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 01 May 2011 21:33:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3007C2388A2C; Sun, 1 May 2011 21:33:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1098414 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resou... Date: Sun, 01 May 2011 21:33:24 -0000 To: mapreduce-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110501213325.3007C2388A2C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Sun May 1 21:33:23 2011 New Revision: 1098414 URL: http://svn.apache.org/viewvc?rev=1098414&view=rev Log: RM Restart Phase 2 - Completed the recovery of components in the RM (mahadev) Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java Removed: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ResourceContext.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestSchedulerNegotiator.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original) +++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Sun May 1 21:33:23 2011 @@ -4,6 +4,8 @@ Trunk (unreleased changes) MAPREDUCE-279 + RM Restart Phase 2 - Completed the recovery of components in the RM (mahadev) + MAPREDUCE-2434. Metrics for ResourceManager. (Luke Lu via acmurthy) Fix container launch w/ inconsistent credential file naming. (cdouglas) Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Sun May 1 21:33:23 2011 @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager; @@ -217,7 +218,8 @@ public class TestRMContainerAllocator { } private FifoScheduler createScheduler() throws YarnRemoteException { - FifoScheduler fsc = new FifoScheduler() { + ClusterTracker clusterTracker = null; + FifoScheduler fsc = new FifoScheduler(clusterTracker) { //override this to copy the objects //otherwise FifoScheduler updates the numContainers in same objects as kept by //RMContainerAllocator @@ -240,7 +242,7 @@ public class TestRMContainerAllocator { } }; try { - fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager()); + fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null); fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class), recordFactory.newRecordInstance(ApplicationMaster.class), "test", null, null); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Sun May 1 21:33:23 2011 @@ -76,7 +76,7 @@ public class AdminService extends Abstra public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnRemoteException { try { - scheduler.reinitialize(conf, null); // ContainerTokenSecretManager can't + scheduler.reinitialize(conf, null, null); // ContainerTokenSecretManager can't // be 'refreshed' RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Sun May 1 21:33:23 2011 @@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; -import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ResourceContext; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.service.AbstractService; @@ -74,7 +74,7 @@ import org.apache.hadoop.yarn.service.Ab public class ClientRMService extends AbstractService implements ClientRMProtocol { private static final Log LOG = LogFactory.getLog(ClientRMService.class); - final private ResourceContext clusterInfo; + final private ClusterTracker clusterInfo; final private ApplicationsManager applicationsManager; final private ResourceScheduler scheduler; @@ -84,7 +84,7 @@ public class ClientRMService extends Abs InetSocketAddress clientBindAddress; public ClientRMService(ApplicationsManager applicationsManager, - ResourceContext clusterInfo, ResourceScheduler scheduler) { + ClusterTracker clusterInfo, ResourceScheduler scheduler) { super(ClientRMService.class.getName()); this.clusterInfo = clusterInfo; this.applicationsManager = applicationsManager; Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Sun May 1 21:33:23 2011 @@ -32,13 +32,15 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events. + ApplicationMasterEvents.ApplicationTrackerEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; @@ -51,7 +53,7 @@ import org.apache.hadoop.yarn.webapp.Web * The ResourceManager is the main class that is a set of components. * */ -public class ResourceManager extends CompositeService { +public class ResourceManager extends CompositeService implements Recoverable { private static final Log LOG = LogFactory.getLog(ResourceManager.class); public static final long clusterTimeStamp = System.currentTimeMillis(); private YarnConfiguration conf; @@ -73,10 +75,11 @@ public class ResourceManager extends Com private AtomicBoolean shutdown = new AtomicBoolean(false); private WebApp webApp; private RMContext asmContext; - private Store store; + private final Store store; - public ResourceManager() { + public ResourceManager(Store store) { super("ResourceManager"); + this.store = store; } public RMContext getRMContext() { @@ -114,10 +117,12 @@ public class ResourceManager extends Com } } + public void recover() { + + } @Override public synchronized void init(Configuration conf) { - this.store = StoreFactory.getStore(conf); this.asmContext = new RMContextImpl(this.store); addService(asmContext.getDispatcher()); // Initialize the config @@ -128,11 +133,7 @@ public class ResourceManager extends Com conf.getClass(YarnConfiguration.RESOURCE_SCHEDULER, FifoScheduler.class, ResourceScheduler.class), this.conf); - try { - this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } + this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, scheduler); //TODO change this to be random this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager @@ -141,8 +142,14 @@ public class ResourceManager extends Com applicationsManager = createApplicationsManagerImpl(); addService(applicationsManager); - rmResourceTracker = createRMResourceTracker(this.scheduler); + rmResourceTracker = createRMResourceTracker(); addService(rmResourceTracker); + + try { + this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager, rmResourceTracker); + } catch (IOException ioe) { + throw new RuntimeException("Failed to initialize scheduler", ioe); + } clientRM = createClientRMService(); addService(clientRM); @@ -200,8 +207,8 @@ public class ResourceManager extends Com super.stop(); } - protected RMResourceTrackerImpl createRMResourceTracker(ResourceListener listener) { - return new RMResourceTrackerImpl(this.containerTokenSecretManager, listener); + protected RMResourceTrackerImpl createRMResourceTracker() { + return new RMResourceTrackerImpl(this.containerTokenSecretManager); } protected ApplicationsManagerImpl createApplicationsManagerImpl() { @@ -248,13 +255,22 @@ public class ResourceManager extends Com return this.rmResourceTracker; } + + @Override + public void recover(RMState state) throws Exception { + applicationsManager.recover(state); + rmResourceTracker.recover(state); + scheduler.recover(state); + } public static void main(String argv[]) { ResourceManager resourceManager = null; try { Configuration conf = new YarnConfiguration(); - resourceManager = new ResourceManager(); + Store store = StoreFactory.getStore(conf); + resourceManager = new ResourceManager(store); resourceManager.init(conf); + //resourceManager.recover(); resourceManager.start(); } catch (Throwable e) { LOG.error("Error starting RM", e); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Sun May 1 21:33:23 2011 @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; @@ -45,6 +45,9 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.service.AbstractService; /** @@ -55,7 +58,7 @@ import org.apache.hadoop.yarn.service.Ab @Evolving @Private public class AMTracker extends AbstractService implements EventHandler> { +>, Recoverable { private static final Log LOG = LogFactory.getLog(AMTracker.class); private HeartBeatThread heartBeatThread; private long amExpiryInterval; @@ -63,9 +66,9 @@ public class AMTracker extends AbstractS private EventHandler handler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private int amMaxRetries; - + private final RMContext asmContext; - + private final Map applications = new ConcurrentHashMap(); @@ -84,7 +87,7 @@ public class AMTracker extends AbstractS } } ); - + public AMTracker(RMContext asmContext) { super(AMTracker.class.getName()); this.heartBeatThread = new HeartBeatThread(); @@ -96,7 +99,7 @@ public class AMTracker extends AbstractS super.init(conf); this.handler = asmContext.getDispatcher().getEventHandler(); this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL, - YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL); + YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL); LOG.info("AM expiry interval: " + this.amExpiryInterval); this.amMaxRetries = conf.getInt(YarnConfiguration.AM_MAX_RETRIES, YarnConfiguration.DEFAULT_AM_MAX_RETRIES); @@ -170,9 +173,9 @@ public class AMTracker extends AbstractS am = applications.get(app); } handler.handle(new ASMEvent - (ApplicationEventType.EXPIRE, am)); - } + (ApplicationEventType.EXPIRE, am)); } + } @Override public void stop() { @@ -189,14 +192,14 @@ public class AMTracker extends AbstractS public void addMaster(String user, ApplicationSubmissionContext submissionContext, String clientToken) { ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(asmContext, - user, submissionContext, clientToken); + user, submissionContext, clientToken); synchronized(applications) { applications.put(applicationMaster.getApplicationID(), applicationMaster); } asmContext.getDispatcher().getSyncHandler().handle(new ASMEvent( ApplicationEventType.ALLOCATE, applicationMaster)); } - + public void finish(ApplicationId application) { ApplicationMasterInfo masterInfo = null; synchronized(applications) { @@ -245,14 +248,14 @@ public class AMTracker extends AbstractS public void kill(ApplicationId applicationID) { ApplicationMasterInfo masterInfo = null; - + synchronized(applications) { masterInfo = applications.get(applicationID); } handler.handle(new ASMEvent(ApplicationEventType.KILL, - masterInfo)); + masterInfo)); } - + /* * this class is used for passing status context to the application state * machine. @@ -261,14 +264,14 @@ public class AMTracker extends AbstractS private final ApplicationId appID; private final ApplicationMaster master; private final UnsupportedOperationException notimplemented; - + public TrackerAppContext( - ApplicationId appId, ApplicationMaster master) { + ApplicationId appId, ApplicationMaster master) { this.appID = appId; this.master = master; this.notimplemented = new NotImplementedException(); } - + @Override public ApplicationSubmissionContext getSubmissionContext() { throw notimplemented; @@ -303,7 +306,7 @@ public class AMTracker extends AbstractS } @Override public String getName() { - throw notimplemented; + throw notimplemented; } @Override public String getQueue() { @@ -312,10 +315,10 @@ public class AMTracker extends AbstractS @Override public int getFailedCount() { - throw notimplemented; + throw notimplemented; } } - + public void heartBeat(ApplicationStatus status) { ApplicationMaster master = recordFactory.newRecordInstance(ApplicationMaster.class); master.setStatus(status); @@ -324,7 +327,7 @@ public class AMTracker extends AbstractS handler.handle(new ASMEvent(ApplicationEventType.STATUSUPDATE, context)); } - + public void registerMaster(ApplicationMaster applicationMaster) { applicationMaster.getStatus().setLastSeen(System.currentTimeMillis()); ApplicationMasterInfo master = null; @@ -335,9 +338,9 @@ public class AMTracker extends AbstractS TrackerAppContext registrationContext = new TrackerAppContext( master.getApplicationID(), applicationMaster); handler.handle(new ASMEvent(ApplicationEventType. - REGISTERED, registrationContext)); + REGISTERED, registrationContext)); } - + @Override public void handle(ASMEvent event) { ApplicationId appID = event.getAppContext().getApplicationID(); @@ -354,26 +357,76 @@ public class AMTracker extends AbstractS /* we need to launch the applicaiton master on allocated transition */ if (masterInfo.getState() == ApplicationState.ALLOCATED) { handler.handle(new ASMEvent( - ApplicationEventType.LAUNCH, masterInfo)); + ApplicationEventType.LAUNCH, masterInfo)); } if (masterInfo.getState() == ApplicationState.LAUNCHED) { /* the application move to a launched state start tracking */ synchronized (amExpiryQueue) { LOG.info("DEBUG -- adding to expiry " + masterInfo.getStatus() + - " currenttime " + System.currentTimeMillis()); + " currenttime " + System.currentTimeMillis()); amExpiryQueue.add(masterInfo.getStatus()); } } - + /* check to see if the AM is an EXPIRED_PENDING state and start off the cycle again */ if (masterInfo.getState() == ApplicationState.EXPIRED_PENDING) { /* check to see if the number of retries are reached or not */ if (masterInfo.getFailedCount() < this.amMaxRetries) { handler.handle(new ASMEvent(ApplicationEventType.ALLOCATE, - masterInfo)); + masterInfo)); } else { handler.handle(new ASMEvent(ApplicationEventType. - FAILED_MAX_RETRIES, masterInfo)); + FAILED_MAX_RETRIES, masterInfo)); + } + } + } + + @Override + public void recover(RMState state) { + for (Map.Entry entry: state.getStoredApplications().entrySet()) { + ApplicationId appId = entry.getKey(); + ApplicationInfo appInfo = entry.getValue(); + ApplicationMasterInfo masterInfo = new ApplicationMasterInfo(this.asmContext, + appInfo.getApplicationSubmissionContext().getUser(), appInfo.getApplicationSubmissionContext(), + appInfo.getApplicationMaster().getClientToken()); + ApplicationMaster master = masterInfo.getMaster(); + ApplicationMaster storedAppMaster = appInfo.getApplicationMaster(); + master.setAMFailCount(storedAppMaster.getAMFailCount()); + master.setApplicationId(storedAppMaster.getApplicationId()); + master.setClientToken(storedAppMaster.getClientToken()); + master.setContainerCount(storedAppMaster.getContainerCount()); + master.setHttpPort(storedAppMaster.getHttpPort()); + master.setHost(storedAppMaster.getHost()); + master.setRpcPort(storedAppMaster.getRpcPort()); + master.setStatus(storedAppMaster.getStatus()); + master.setState(storedAppMaster.getState()); + applications.put(appId, masterInfo); + + switch(master.getState()) { + case ALLOCATED: + break; + case ALLOCATING: + break; + case CLEANUP: + break; + case EXPIRED_PENDING: + break; + case COMPLETED: + break; + case FAILED: + break; + case LAUNCHED: + break; + case KILLED: + break; + case LAUNCHING: + break; + case PAUSED: + break; + case PENDING: + break; + case RUNNING: + break; } } } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManager.java Sun May 1 21:33:23 2011 @@ -27,13 +27,14 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; /** * This interface defines the interface for ApplicationsManager. */ @Private @Evolving -public interface ApplicationsManager { +public interface ApplicationsManager extends Recoverable { ApplicationId getNewApplicationID(); ApplicationMaster getApplicationMaster(ApplicationId applicationId); Application getApplication(ApplicationId applicationID); Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Sun May 1 21:33:23 2011 @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.SNEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; @@ -251,4 +252,10 @@ public class ApplicationsManagerImpl ext createApplication(master.getMaster(), master.getUser(), master.getQueue(), master.getName()); } + + + @Override + public void recover(RMState state) { + amTracker.recover(state); + } } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Sun May 1 21:33:23 2011 @@ -87,18 +87,17 @@ public class MemStore implements Store { } @Override - public List getStoredNodeManagers() throws IOException { + public List getStoredNodeManagers() { return new ArrayList(); } @Override - public NodeId getLastLoggedNodeId() throws IOException { + public NodeId getLastLoggedNodeId() { return nodeId; } @Override - public Map getStoredApplications() - throws IOException { + public Map getStoredApplications() { return new HashMap(); } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java?rev=1098414&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Recoverable.java Sun May 1 21:33:23 2011 @@ -0,0 +1,24 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; + +public interface Recoverable { + public void recover(RMState state) throws Exception; +} \ No newline at end of file Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Sun May 1 21:33:23 2011 @@ -36,9 +36,9 @@ public interface Store extends NodeStore public List getContainers(); } public interface RMState { - public List getStoredNodeManagers() throws IOException; - public Map getStoredApplications() throws IOException; - public NodeId getLastLoggedNodeId() throws IOException; + public List getStoredNodeManagers() ; + public Map getStoredApplications(); + public NodeId getLastLoggedNodeId(); } public RMState restore() throws IOException; } \ No newline at end of file Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Sun May 1 21:33:23 2011 @@ -324,7 +324,7 @@ public class ZKStore implements Store { } @Override - public List getStoredNodeManagers() throws IOException { + public List getStoredNodeManagers() { return nodeManagers; } @@ -403,8 +403,7 @@ public class ZKStore implements Store { } @Override - public Map getStoredApplications() - throws IOException { + public Map getStoredApplications() { return applications; } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java?rev=1098414&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ClusterTracker.java Sun May 1 21:33:23 2011 @@ -0,0 +1,65 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener; + + +/** + * The read-only interface for cluster resource + */ +public interface ClusterTracker extends Recoverable{ + + /** + * Get all node info + * @return a list of node info + */ + List getAllNodeInfo(); + + /** + * Get cluster metrics from the resource tracker. + * @return the cluster metrics for the cluster. + */ + YarnClusterMetrics getClusterMetrics(); + + /** + * the application that is finished. + * @param applicationId the applicaiton that finished + * @param nodesToNotify the nodes that need to be notified. + */ + void finishedApplication(ApplicationId applicationId, List nodesToNotify); + + /** + * Release a container + * @param container the container to be released + */ + boolean releaseContainer(Container container); + + /** + * Adding listener to be notified of node updates. + * @param listener + */ + public void addListener(ResourceListener listener); +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java Sun May 1 21:33:23 2011 @@ -1,26 +1,33 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; +import java.util.List; + import org.apache.hadoop.net.Node; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application; /** * Node managers information on available resources @@ -78,4 +85,23 @@ public interface NodeInfo { * @return the number of containers */ public int getNumContainers(); - } \ No newline at end of file + + /** + * Inform the node of allocated containers + * @param applicationId the application id + * @param containers the list of containers + */ + public void allocateContainer(ApplicationId applicationId, + List containers); + + /** + * + * @return + */ + public Application getReservedApplication(); + + public void reserveResource(Application application, Priority priority, + Resource resource); + + public void unreserveResource(Application application, Priority priority); +} \ No newline at end of file Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java Sun May 1 21:33:23 2011 @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.res * This should be package private. It does not need to be public. * */ -class NodeInfoTracker { +public class NodeInfoTracker { private final NodeManager node; HeartbeatResponse lastHeartBeatResponse; private long lastSeen; Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Sun May 1 21:33:23 2011 @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; @@ -41,6 +41,8 @@ import org.apache.hadoop.net.NetworkTopo import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.server.api import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse; @@ -72,7 +75,7 @@ import org.apache.hadoop.yarn.service.Ab *` */ public class RMResourceTrackerImpl extends AbstractService implements -ResourceTracker, ResourceContext { +ResourceTracker, ClusterTracker { private static final Log LOG = LogFactory.getLog(RMResourceTrackerImpl.class); /* we dont garbage collect on nodes. A node can come back up again and re register, * so no use garbage collecting. Though admin can break the RM by bouncing @@ -82,9 +85,9 @@ ResourceTracker, ResourceContext { private final Map nodeManagers = new ConcurrentHashMap(); private final HeartBeatThread heartbeatThread; - + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + private final TreeSet nmExpiryQueue = new TreeSet( new Comparator() { @@ -94,17 +97,17 @@ ResourceTracker, ResourceContext { long p1LastSeen = nit1.getNodeLastSeen(); long p2LastSeen = nit2.getNodeLastSeen(); if (p1LastSeen < p2LastSeen) { - return -1; + return -1; } else if (p1LastSeen > p2LastSeen) { - return 1; - } else { - return (nit1.getNodeManager().getNodeID().getId() - - nit2.getNodeManager().getNodeID().getId()); - } + return 1; + } else { + return (nit1.getNodeManager().getNodeID().getId() - + nit2.getNodeManager().getNodeID().getId()); } } - ); - + } + ); + private ResourceListener resourceListener; private InetSocketAddress resourceTrackerAddress; private Server server; @@ -113,13 +116,11 @@ ResourceTracker, ResourceContext { private static final HeartbeatResponse reboot = recordFactory.newRecordInstance(HeartbeatResponse.class); private long nmExpiryInterval; - public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager, - ResourceListener listener) { + public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager) { super(RMResourceTrackerImpl.class.getName()); reboot.setReboot(true); this.containerTokenSecretManager = containerTokenSecretManager; this.heartbeatThread = new HeartBeatThread(); - this.resourceListener = listener; } @Override @@ -134,6 +135,11 @@ ResourceTracker, ResourceContext { } @Override + public void addListener(ResourceListener listener) { + this.resourceListener = listener; + } + + @Override public void start() { // ResourceTrackerServer authenticates NodeManager via Kerberos if // security is enabled, so no secretManager. @@ -159,26 +165,20 @@ ResourceTracker, ResourceContext { public static Node resolve(String hostName) { return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK); } - - @Override - public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException { - String host = request.getHost(); - int cmPort = request.getContainerManagerPort(); - String node = host + ":" + cmPort; - String httpAddress = host + ":" + request.getHttpPort(); - Resource capability = request.getResource(); - NodeId nodeId = getNodeId(node); + protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId, + String hostString, String httpAddress, Node node, Resource capability) { NodeInfoTracker nTracker = null; synchronized(nodeManagers) { if (!nodeManagers.containsKey(nodeId)) { - /* we do the resolving here, so that scheduler does not have to do it */ + LOG.info("DEBUG -- Adding " + hostString); NodeManager nodeManager = - new NodeManagerImpl(nodeId, node.toString(), httpAddress, - resolve(node.toString()), - capability); - // Inform the scheduler + new NodeManagerImpl(nodeId, hostString, httpAddress, + node, + capability); + nodes.put(nodeManager.getNodeAddress(), nodeId); + /* Inform the listeners */ resourceListener.addNode(nodeManager); HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class); response.setResponseId(0); @@ -189,21 +189,67 @@ ResourceTracker, ResourceContext { nTracker.updateLastSeen(System.currentTimeMillis()); } } + return nTracker; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest + request) throws YarnRemoteException { + String host = request.getHost(); + int cmPort = request.getContainerManagerPort(); + String node = host + ":" + cmPort; + String httpAddress = host + ":" + request.getHttpPort(); + Resource capability = request.getResource(); + + NodeId nodeId = getNodeId(node); + NodeInfoTracker nTracker = getAndAddNodeInfoTracker( + nodeId, node.toString(), httpAddress, + resolve(node.toString()), + capability); + // Inform the scheduler + addForTracking(nodeId); LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress + ") registered with capability: " + capability.getMemory() + ", assigned nodeId " + nodeId.getId()); - RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class); + RegistrationResponse regResponse = recordFactory.newRecordInstance( + RegistrationResponse.class); regResponse.setNodeId(nodeId); SecretKey secretKey = this.containerTokenSecretManager.createAndGetSecretKey(node); regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded())); - RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); + RegisterNodeManagerResponse response = recordFactory.newRecordInstance( + RegisterNodeManagerResponse.class); response.setRegistrationResponse(regResponse); return response; } + /** + * Update the listeners. This method can be inlined but are there for + * making testing easier + * @param nodeManager the {@link NodeInfo} to update. + * @param containers the containers from the status of the node manager. + */ + protected void updateListener(NodeInfo nodeManager, Map> + containers) { + /* inform any listeners of node heartbeats */ + resourceListener.nodeUpdate( + nodeManager, containers); + } + + + /** + * Get a response for the nodemanager heartbeat + * @param nodeManager the nodemanager to update + * @param containers the containers from the status update. + * @return the {@link NodeResponse} for the node manager. + */ + protected NodeResponse NodeResponse(NodeManager nodeManager, Map> containers) { + return nodeManager.statusUpdate(containers); + } + @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { org.apache.hadoop.yarn.server.api.records.NodeStatus remoteNodeStatus = request.getNodeStatus(); @@ -234,12 +280,19 @@ ResourceTracker, ResourceContext { nodeHbResponse.setHeartbeatResponse(reboot); return nodeHbResponse; } - + + /** TODO This should be 3 step process. + * nodemanager.statusupdate + * listener.update() + * nodemanager.getNodeResponse() + * This will allow flexibility in updates/scheduling/premption + */ + NodeResponse nodeResponse = nodeManager.statusUpdate(remoteNodeStatus.getAllContainers()); /* inform any listeners of node heartbeats */ - NodeResponse nodeResponse = resourceListener.nodeUpdate( + updateListener( nodeManager, remoteNodeStatus.getAllContainers()); + - HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class); response.addAllContainersToCleanup(nodeResponse.getContainersToCleanUp()); @@ -249,12 +302,12 @@ ResourceTracker, ResourceContext { nTracker.refreshHeartBeatResponse(response); nTracker.updateLastSeen(remoteNodeStatus.getLastSeen()); boolean prevHealthStatus = - nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy(); + nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy(); NodeHealthStatus remoteNodeHealthStatus = - remoteNodeStatus.getNodeHealthStatus(); + remoteNodeStatus.getNodeHealthStatus(); nTracker.getNodeManager().updateHealthStatus( remoteNodeHealthStatus); -// boolean prevHealthStatus = nodeHbResponse. + // boolean prevHealthStatus = nodeHbResponse. nodeHbResponse.setHeartbeatResponse(response); // Take care of node-health @@ -263,18 +316,18 @@ ResourceTracker, ResourceContext { // Node's health-status changed. if (!remoteNodeHealthStatus.getIsNodeHealthy()) { // TODO: Node has become unhealthy, remove? -// LOG.info("Node " + nodeManager.getNodeID() -// + " has become unhealthy. Health-check report: " -// + remoteNodeStatus.nodeHealthStatus.healthReport -// + "Removing it from the scheduler."); -// resourceListener.removeNode(nodeManager); + // LOG.info("Node " + nodeManager.getNodeID() + // + " has become unhealthy. Health-check report: " + // + remoteNodeStatus.nodeHealthStatus.healthReport + // + "Removing it from the scheduler."); + // resourceListener.removeNode(nodeManager); } else { // TODO: Node has become healthy back again, add? -// LOG.info("Node " + nodeManager.getNodeID() -// + " has become healthy back again. Health-check report: " -// + remoteNodeStatus.nodeHealthStatus.healthReport -// + " Adding it to the scheduler."); -// this.resourceListener.addNode(nodeManager); + // LOG.info("Node " + nodeManager.getNodeID() + // + " has become healthy back again. Health-check report: " + // + remoteNodeStatus.nodeHealthStatus.healthReport + // + " Adding it to the scheduler."); + // this.resourceListener.addNode(nodeManager); } } return nodeHbResponse; @@ -286,13 +339,12 @@ ResourceTracker, ResourceContext { return (ntracker == null ? null: ntracker.getNodeManager()); } - private synchronized NodeId getNodeId(String node) { + private NodeId getNodeId(String node) { NodeId nodeId; nodeId = nodes.get(node); if (nodeId == null) { nodeId = recordFactory.newRecordInstance(NodeId.class); nodeId.setId(nodeCounter.getAndIncrement()); - nodes.put(node.toString(), nodeId); } return nodeId; } @@ -328,7 +380,7 @@ ResourceTracker, ResourceContext { nmExpiryQueue.add(nodeID); } } - + protected void expireNMs(List nodes) { for (NodeId id: nodes) { synchronized (nodeManagers) { @@ -357,10 +409,10 @@ ResourceTracker, ResourceContext { * its alright. We do not want to hold a hold on nodeManagers while going * through the expiry queue. */ - + List expired = new ArrayList(); LOG.info("Starting expiring thread with interval " + nmExpiryInterval); - + while (!stop) { long now = System.currentTimeMillis(); expired.clear(); @@ -394,4 +446,34 @@ ResourceTracker, ResourceContext { } } } + + @Override + public void finishedApplication(ApplicationId applicationId, + List nodesToNotify) { + for (NodeInfo info: nodesToNotify) { + NodeManager node; + synchronized(nodeManagers) { + node = nodeManagers.get(info.getNodeID()).getNodeManager(); + } + node.finishedApplication(applicationId); + } + } + + @Override + public boolean releaseContainer(Container container) { + NodeManager node; + synchronized (nodeManagers) { + LOG.info("DEBUG -- Container manager address " + container.getContainerManagerAddress()); + NodeId nodeId = nodes.get(container.getContainerManagerAddress()); + node = nodeManagers.get(nodeId).getNodeManager(); + } + node.releaseContainer(container); + return false; + } + + @Override + public void recover(RMState state) { + + } + } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Sun May 1 21:33:23 2011 @@ -78,15 +78,15 @@ public class Application { boolean pending = true; // for app metrics /* Reserved containers */ - private final Comparator nodeComparator = - new Comparator() { + private final Comparator nodeComparator = + new Comparator() { @Override - public int compare(NodeManager o1, NodeManager o2) { + public int compare(NodeInfo o1, NodeInfo o2) { return o1.getNodeID().getId() - o2.getNodeID().getId(); } }; - final Map> reservedContainers = - new HashMap>(); + final Map> reservedContainers = + new HashMap>(); public Application(ApplicationId applicationId, ApplicationMaster master, Queue queue, String user) { @@ -389,15 +389,15 @@ public class Application { } public synchronized int getReservedContainers(Priority priority) { - Set reservedNodes = this.reservedContainers.get(priority); + Set reservedNodes = this.reservedContainers.get(priority); return (reservedNodes == null) ? 0 : reservedNodes.size(); } - public synchronized void reserveResource(NodeManager node, Priority priority, + public synchronized void reserveResource(NodeInfo node, Priority priority, Resource resource) { - Set reservedNodes = this.reservedContainers.get(priority); + Set reservedNodes = this.reservedContainers.get(priority); if (reservedNodes == null) { - reservedNodes = new TreeSet(nodeComparator); + reservedNodes = new TreeSet(nodeComparator); reservedContainers.put(priority, reservedNodes); } reservedNodes.add(node); @@ -406,8 +406,8 @@ public class Application { " at priority " + priority); } - public synchronized void unreserveResource(NodeManager node, Priority priority) { - Set reservedNodes = reservedContainers.get(priority); + public synchronized void unreserveResource(NodeInfo node, Priority priority) { + Set reservedNodes = reservedContainers.get(priority); reservedNodes.remove(node); if (reservedNodes.isEmpty()) { this.reservedContainers.remove(priority); @@ -418,8 +418,8 @@ public class Application { " at priority " + priority); } - public synchronized boolean isReserved(NodeManager node, Priority priority) { - Set reservedNodes = reservedContainers.get(priority); + public synchronized boolean isReserved(NodeInfo node, Priority priority) { + Set reservedNodes = reservedContainers.get(priority); if (reservedNodes != null) { return reservedNodes.contains(node); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManager.java Sun May 1 21:33:23 2011 @@ -5,8 +5,6 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; @@ -14,21 +12,11 @@ public interface NodeManager extends Nod public static final String ANY = "*"; - void allocateContainer(ApplicationId applicationId, - List containers); - boolean releaseContainer(Container container); void updateHealthStatus(NodeHealthStatus healthStatus); NodeResponse statusUpdate(Map> containers); - void notifyFinishedApplication(ApplicationId applicationId); - - Application getReservedApplication(); - - void reserveResource(Application application, Priority priority, - Resource resource); - - void unreserveResource(Application application, Priority priority); + void finishedApplication(ApplicationId applicationId); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java Sun May 1 21:33:23 2011 @@ -309,7 +309,7 @@ public class NodeManagerImpl implements Resources.addTo(usedResource, resource); } - public synchronized void notifyFinishedApplication(ApplicationId applicationId) { + public synchronized void finishedApplication(ApplicationId applicationId) { finishedApplications.add(applicationId); /* make sure to iterate through the list and remove all the containers that * belong to this application. Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceListener.java Sun May 1 21:33:23 2011 @@ -23,10 +23,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; /** @@ -41,7 +38,7 @@ public interface ResourceListener { * add a node to the resource listener. * @param nodeManager the nodeManager view */ - public void addNode(NodeManager nodeManager); + public void addNode(NodeInfo nodeInfo); /** * A node has been removed from the cluster. @@ -53,9 +50,7 @@ public interface ResourceListener { * A status update from a NodeManager * @param nodeInfo NodeManager info * @param containers the containers completed/running/failed on this node. - * @return response information for the node, which containers to kill and - * applications to clean. */ - public NodeResponse nodeUpdate(NodeInfo nodeInfo, + public void nodeUpdate(NodeInfo nodeInfo, Map> containers); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Sun May 1 21:33:23 2011 @@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; /** @@ -36,7 +38,7 @@ import org.apache.hadoop.yarn.server.sec @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends ResourceListener, YarnScheduler, - EventHandler> { + EventHandler>, Recoverable { /** * Re-initialize the ResourceScheduler. * @param conf configuration @@ -44,5 +46,5 @@ public interface ResourceScheduler exten * @throws IOException */ void reinitialize(Configuration conf, - ContainerTokenSecretManager secretManager) throws IOException; + ContainerTokenSecretManager secretManager, ClusterTracker clusterTracker) throws IOException; } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1098414&r1=1098413&r2=1098414&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Sun May 1 21:33:23 2011 @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -39,6 +38,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationMaster; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker; import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; @@ -82,20 +83,21 @@ implements ResourceScheduler, CapacitySc private final Comparator applicationComparator = new Comparator() { - @Override - public int compare(Application a1, Application a2) { - return a1.getApplicationId().getId() - a2.getApplicationId().getId(); - } + @Override + public int compare(Application a1, Application a2) { + return a1.getApplicationId().getId() - a2.getApplicationId().getId(); + } }; private CapacitySchedulerConfiguration conf; private ContainerTokenSecretManager containerTokenSecretManager; + private ClusterTracker clusterTracker; private Map queues = new ConcurrentHashMap(); private Resource minimumAllocation; - + private Map applications = new TreeMap( new org.apache.hadoop.yarn.util.BuilderUtils.ApplicationIdComparator()); @@ -123,13 +125,14 @@ implements ResourceScheduler, CapacitySc @Override public synchronized void reinitialize(Configuration conf, - ContainerTokenSecretManager containerTokenSecretManager) + ContainerTokenSecretManager containerTokenSecretManager, ClusterTracker clusterTracker) throws IOException { if (!initialized) { this.conf = new CapacitySchedulerConfiguration(conf); this.minimumAllocation = this.conf.getMinimumAllocation(); this.containerTokenSecretManager = containerTokenSecretManager; - + this.clusterTracker = clusterTracker; + if (clusterTracker != null) clusterTracker.addListener(this); initializeQueues(this.conf); initialized = true; } else { @@ -166,11 +169,11 @@ implements ResourceScheduler, CapacitySc // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); - + // Re-configure queues root.reinitialize(newRoot, clusterResource); } - + /** * Ensure all existing queues are present. Queues cannot be deleted * @param queues existing queues @@ -212,11 +215,11 @@ implements ResourceScheduler, CapacitySc } queues.put(queueName, queue); - + LOG.info("Initialized queue: " + queue); return queue; } - + @Override public void addApplication(ApplicationId applicationId, ApplicationMaster master, String user, String queueName, Priority priority) @@ -317,9 +320,9 @@ implements ResourceScheduler, CapacitySc boolean includeApplications, boolean includeChildQueues, boolean recursive) throws IOException { Queue queue = null; - + synchronized (this) { - queue = this.queues.get(queueName); + queue = this.queues.get(queueName); } if (queue == null) { @@ -351,30 +354,44 @@ implements ResourceScheduler, CapacitySc int memory = ask.getCapability().getMemory(); int minMemory = minimumAllocation.getMemory(); ask.getCapability().setMemory ( - minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0))); + minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0))); } + private List getCompletedContainers(Map> allContainers) { + if (allContainers == null) { + return new ArrayList(); + } + List completedContainers = new ArrayList(); + // Iterate through the running containers and update their status + for (Map.Entry> e : + allContainers.entrySet()) { + for (Container c: e.getValue()) { + if (c.getState() == ContainerState.COMPLETE) { + completedContainers.add(c); + } + } + } + return completedContainers; + } @Override - public synchronized NodeResponse nodeUpdate(NodeInfo node, + public synchronized void nodeUpdate(NodeInfo nm, Map> containers ) { - LOG.info("nodeUpdate: " + node); + LOG.info("nodeUpdate: " + nm); - NodeResponse nodeResponse = nodeUpdateInternal(node, containers); // Completed containers - processCompletedContainers(nodeResponse.getCompletedContainers()); - NodeManager nm = nodes.get(node.getNodeAddress()); - + processCompletedContainers(getCompletedContainers(containers)); + // Assign new containers // 1. Check for reserved applications // 2. Schedule if there are no reservations - + Application reservedApplication = nm.getReservedApplication(); if (reservedApplication != null) { // Try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + node); + reservedApplication.getApplicationId() + " on node: " + nm); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); queue.assignContainers(clusterResource, nm); } @@ -383,12 +400,11 @@ implements ResourceScheduler, CapacitySc if (nm.getReservedApplication() == null) { root.assignContainers(clusterResource, nm); } else { - LOG.info("Skipping scheduling since node " + node + + LOG.info("Skipping scheduling since node " + nm + " is reserved by application " + nm.getReservedApplication().getApplicationId()); } - return nodeResponse; } private synchronized void processCompletedContainers( @@ -455,11 +471,9 @@ implements ResourceScheduler, CapacitySc break; } } - - private Map nodes = new HashMap(); private Resource clusterResource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); - - + + public synchronized Resource getClusterResource() { return clusterResource; } @@ -467,18 +481,10 @@ implements ResourceScheduler, CapacitySc @Override public synchronized void removeNode(NodeInfo nodeInfo) { Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); - //TODO inform the applications that the containers are completed/failed - nodes.remove(nodeInfo.getNodeAddress()); } - - public synchronized boolean isTracked(NodeInfo nodeInfo) { - NodeManager node = nodes.get(nodeInfo.getNodeAddress()); - return (node == null? false: true); - } - + @Override - public synchronized void addNode(NodeManager nodeManager) { - nodes.put(nodeManager.getNodeAddress(), nodeManager); + public synchronized void addNode(NodeInfo nodeManager) { Resources.addTo(clusterResource, nodeManager.getTotalCapability()); } @@ -486,30 +492,33 @@ implements ResourceScheduler, CapacitySc Container container) { // Reap containers LOG.info("Application " + applicationId + " released container " + container); - NodeManager nodeManager = nodes.get(container.getContainerManagerAddress()); - return nodeManager.releaseContainer(container); - } - - public synchronized NodeResponse nodeUpdateInternal(NodeInfo nodeInfo, - Map> containers) { - NodeManager node = nodes.get(nodeInfo.getNodeAddress()); - LOG.debug("nodeUpdate: node=" + nodeInfo.getNodeAddress() + - " available=" + nodeInfo.getAvailableResource().getMemory()); - return node.statusUpdate(containers); - + return clusterTracker.releaseContainer(container); } + public synchronized void addAllocatedContainers(NodeInfo nodeInfo, ApplicationId applicationId, List containers) { - NodeManager node = nodes.get(nodeInfo.getNodeAddress()); - node.allocateContainer(applicationId, containers); + nodeInfo.allocateContainer(applicationId, containers); } public synchronized void finishedApplication(ApplicationId applicationId, List nodesToNotify) { - for (NodeInfo node: nodesToNotify) { - NodeManager nodeManager = nodes.get(node.getNodeAddress()); - nodeManager.notifyFinishedApplication(applicationId); + clusterTracker.finishedApplication(applicationId, nodesToNotify); + } + + @Override + public void recover(RMState state) throws Exception { + applications.clear(); + for (Map.Entry entry : state.getStoredApplications().entrySet()) { + ApplicationId appId = entry.getKey(); + ApplicationInfo appInfo = entry.getValue(); + + addApplication(appId, appInfo.getApplicationMaster(), appInfo.getApplicationSubmissionContext().getUser(), + appInfo.getApplicationSubmissionContext().getQueue(), appInfo.getApplicationSubmissionContext().getPriority()); + for (Container c: entry.getValue().getContainers()) { + Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue()); + queue.recoverContainer(clusterResource, applications.get(appId), c); + } } } }